Skip to main content
Version: 1.2.0

Feature set schedule API

You can schedule an ingestion job from Feature Store by using API available on the feature sets.

Schedule a new task

To create new scheduled task, you first need to obtain the feature set.

fs.schedule_ingest("task_name", source, schedule = "0 2 * * *", description = "", credentials = None, allowed_failures = 2)

schedule argument is in cron format (e.g., 0 2 * * * will execute task every day at 2 am).

allowed_failures argument determines how many times the task can fail till a next failure will deschedule the task in order to save resources. A negative number has the meaning that any number of failures is allowed. Default value is 2.

note

Scheduling ingestion task is allowed from all data sources except Spark dataframe. Data source used for scheduling must be stored in permanent accessible locations, which is not true for Spark Dataframes as they live in memory of some Spark session.

To list scheduled tasks

List methods do not return tasks directly. Instead, it returns an iterator which obtains the tasks sets lazily.

fs.schedule.tasks()

Obtaining a task

task = fs.schedule.get("task_id")

Examining task executions

Basic information about the task executions can be obtained by asking for executions history. It will provide the start/end times of scheduled task runs and a final (job) status. A special status 'Created' is delivered in case the scheduled task started, but not yet finished. An accompanied job id information can be utilized to get access to a job that fulfilled the execution in the past.

for execution_record in task.execution_history():
print(execution_record)

Obtaining a lazy ingest task

The lazy ingest task allows you to schedule the ingestion of the data for a feature set to a later time, rather than ingesting the data immediately. Each major version of a feature set can contain only one lazy ingest task. To obtain it, run:

task = fs.schedule.get_lazy_ingest_task()

Deleting task

task = fs.schedule.get("task_id")
task.delete()

Updating task fields

To update the field, simply call the setter of that field, for example:

task = fs.schedule.get("task_id")
task.description = "new description"
task.schedule = "0 6 * * *"

Controlling task liveness

In case a task was scheduled with some defined limit on failures and the failures actually occurred then the task gets automatically paused by Feature Store in order to save resources. To check whether the task was paused or not use following call:

task = fs.schedule.get("task_id")
task.is_paused()

A task can be paused even manually if a user decides so.

task.pause()

A paused task can be rescheduled again by calling a resume() method. The resume method can take an optional argument that enables to set a new limit on allowed failures. If the value isn't provided then existing limit stays without change.

task.resume(allowed_failures = None)

To check a current limit on allowed failures see

task.allowed_failures

Starting lazy ingest task

If lazy ingest task exist on feature set it will be run automatically on first retrieve. The user has the option to run it:

fs.schedule.start_lazy_ingest_task()
note

In case some ingest was executed on feature set version, lazy ingest task will not run.

Timezone configuration for task

By default, Feature Store clients pick the system timezone. It is possible to change the timezone such as:

import os, time
os.environ['TZ'] = 'UTC-05:00'
note

Supported timezone format is UTC-XX:XX, UTC+XX:XX or timezones supported by Python and Scala.


Feedback