Skip to main content
Version: 1.2.0

Spark pipeline example

from featurestore import Client, CSVFile, SparkPipeline
from featurestore.core.job_types import INGEST

# Initialise feature store client
client = Client("ip:port")
client.auth.login()

# Set project specifics
project = client.projects.create("demo")

# Create source for input feature set
csv = CSVFile("wasbs://featurestore@featurestorekuba.blob.core.windows.net/training.csv")
# Extract schema
schema = client.extract_schema_from_source(csv)
# Register input feature set
input_fs = project.feature_sets.register(schema, "input")
# Ingest the input feature set
input_fs.ingest(csv)

# Define Spark pipeline transformation
spark_pipeline = SparkPipeline("pipeline_path")
# Extract schema
schema = client.extract_derived_schema([input_fs], spark_pipeline)
# Register the feature set
my_feature_set = project.feature_sets.register(schema, "feature_set_name", primary_key=["state"])

# Get ingest job
val auto_ingest_job = my_feature_set.get_active_jobs(INGEST)[0]
auto_ingest_job.wait_for_result()

# Retrieve feature set
ref = my_feature_set.retrieve()
ref.download()

Feedback