Spark pipeline example
- Python
- Scala
from featurestore import Client, CSVFile, SparkPipeline
from featurestore.core.job_types import INGEST
# Initialise feature store client
client = Client("ip:port")
client.auth.login()
# Set project specifics
project = client.projects.create("demo")
# Create source for input feature set
csv = CSVFile("wasbs://featurestore@featurestorekuba.blob.core.windows.net/training.csv")
# Extract schema
schema = client.extract_schema_from_source(csv)
# Register input feature set
input_fs = project.feature_sets.register(schema, "input")
# Ingest the input feature set
input_fs.ingest(csv)
# Define Spark pipeline transformation
spark_pipeline = SparkPipeline("pipeline_path")
# Extract schema
schema = client.extract_derived_schema([input_fs], spark_pipeline)
# Register the feature set
my_feature_set = project.feature_sets.register(schema, "feature_set_name", primary_key=["state"])
# Get ingest job
val auto_ingest_job = my_feature_set.get_active_jobs(INGEST)[0]
auto_ingest_job.wait_for_result()
# Retrieve feature set
ref = my_feature_set.retrieve()
ref.download()
import ai.h2o.featurestore.Client
import ai.h2o.featurestore.core.transformations.SparkPipeline
import ai.h2o.featurestore.core.JobTypes
// Initialise feature store client
val client = Client("url")
client.auth.login()
// Set project specifics
val project = client.projects.create("demo")
// Create source for input feature set
val csv = CSVFile("wasbs://featurestore@featurestorekuba.blob.core.windows.net/training.csv")
// Extract schema
val schema = client.extractSchemaFromSource(csv)
// Register input feature set
val inputFs = project.featureSets.register(schema, "input")
// Ingest the input feature set
inputFs.ingest(csv)
// Define Spark pipeline transformation
val sparkPipeline = SparkPipeline("pipeline_path")
// Extract schema
val schemaSpark = client.extractDerivedSchema(Seq(inputFs), sparkPipeline)
// Register the feature set
val myFeatureSet = project.featureSets.register(schemaSpark, "featureSetName", primaryKey=Seq("state"))
// Get ingest job
val autoIngestJob = myFeatureSet.getActiveJobs(JobTypes.INGEST).head
autoIngestJob.waitForResult()
// Retrieve feature set
val ref = myFeatureSet.retrieve()
ref.download()
Feedback
- Submit and view feedback for this page
- Send feedback about H2O Feature Store to cloud-feedback@h2o.ai