Pāriet uz galveno saturu

Triggers for data workflows

Use triggers to automate your data workflow runs. A data workflow can have one or more triggers associated with it, each uniquely identified by an external ID.

padoms

Pass data to workflows

When a trigger starts, it creates a workflow execution and passes data through the workflow input. This data becomes accessible to all tasks in the workflow through dynamic references using the ${workflow.input} syntax. The workflow input can contain a combination of:

  • Static input: Data you define when creating the trigger.

  • Dynamic input: Data collected by the trigger. Data modeling triggers are available as ${workflow.input.items}.

  • System input: Metadata added by the system, for instance ${workflow.input.version}.

Set up trigger rules

Specify a triggerRule to define the conditions for running a trigger. triggerRule consists of a trigger type and its associated parameters.

Schedule a data workflow

Use the schedule trigger type to run a data workflow at regular intervals. The interval is specified by a cron expression.

For example, to run the trigger every day at 12:00 AM, use the cron expression "0 0 * * *". The time zone for the cron expression is UTC.

Scheduled triggers pass only the static input data you define when you create a trigger, and system metadata, such as workflow version.

Trigger on data modeling events

Use the dataModeling trigger type to run a data workflow based on changes to data modeling instances matching a filter.

Specify the filter using the trigger's dataModelingQuery parameter. See also data modeling queries.

Data modeling triggers use a change-based polling mechanism:

  • Polling: The system periodically checks for instances matching your filter criteria.
  • Change detection: Triggers detect changes based on the lastUpdatedTime of instances.
  • Batching: Multiple matching instances are collected into batches.
  • Execution: When batch criteria are met, a workflow execution starts with the collected instances as input.

Prevent excessive data modeling trigger runs

Data modeling triggers fire on any change that updates the lastUpdatedTime property of instances matching your filter. To prevent excessive trigger runs:

  • Use specific filters: Ensure your filter is specific enough to avoid matching instances that shouldn't trigger the workflow.
  • Implement processing flags: Include a property like isProcessed in your data model to track workflow completion.
    • Filter for instances where isProcessed is False.
    • Update isProcessed to True after processing to prevent re-triggering on the same instance

Control batching

You can control the batching of changes using the following parameters:

  • batchSize: The maximum number of items to pass as input to a workflow execution.
  • batchTimeout: The maximum time in seconds to wait for the batch to complete before passing it to a workflow execution.

After the batchTimeout has passed, a partial batch will be passed to the workflow execution. A complete batch will be passed to the workflow execution without further delay.

Example

This example shows how to create a trigger using the Python SDK. The trigger starts a workflow based on changes to instances of ExampleView with the property site equal to Site A. According to the batch configurations, the trigger starts the workflow when it reaches 100 instance changes or 60 seconds after the first change.

# Specify the relevant data model view
example_view_id = ViewId(space="ExampleSpace", external_id="ExampleView", version="1")

# Define the trigger rule, including the data modeling query
trigger_rule = WorkflowDataModelingTriggerRule(
data_modeling_query=WorkflowTriggerDataModelingQuery(
# Query instances of ExampleView with property site="Site A"
with_={
"example_view": NodeResultSetExpression(
filter=Equals(
property=example_view_id.as_property_ref("site"), value="Site A"
)
)
},
# Return properties "site" and "name" from the instances
select={
"example_view": Select(
sources=[
SourceSelector(source=example_view_id, properties=["site", "name"])
]
)
},
),
batch_size=100, # Start the workflow when reaching 100 changes
batch_timeout=60, # Start the workflow when reaching 60 seconds after the first change
)

# Create the trigger
client.workflows.triggers.upsert(
WorkflowTriggerUpsert(
external_id="my-data-modeling-trigger",
trigger_rule=trigger_rule,
workflow_external_id="my-workflow-external-id",
workflow_version="v1",
)
)

Target triggers

The trigger targets a data workflow, defined by a workflowExternalId and workflowVersion.

Trigger input data

You can define an input data object as part of the trigger. When the trigger starts executing the target data workflow, this input object will be provided as input to the workflow.

Static input data

Static input data is defined when creating the trigger and remains constant for all executions:

{
"external_id": "my-trigger",
"input": {
"environment": "production",
"notification_email": "admin@company.com"
}
}

This data is accessible in workflow tasks as ${workflow.input.environment} and ${workflow.input.notification_email}.

Dynamic input data (Data modeling triggers)

For data modeling triggers, the system automatically adds an items array to the workflow input containing the instances that matched the trigger criteria:

{
"version": "v1",
"items": [
{
"instanceType": "node",
"externalId": "instance1",
"space": "mySpace",
"properties": {
"mySpace": {
"myView/version": {
"site": "Site A",
"name": "Equipment 1",
"email": "user@company.com"
}
}
}
},
{
"instanceType": "node",
"externalId": "instance2",
"space": "mySpace",
"properties": {
"mySpace": {
"myView/version": {
"site": "Site A",
"name": "Equipment 2",
"email": "admin@company.com"
}
}
}
}
]
}

Access this data in your tasks using ${workflow.input.items}. Each item in the array represents a data modeling instance that matched the trigger criteria, including the properties you selected in the data modeling query.

Process multiple items

Since data modeling triggers provide arrays of items, design your workflow to handle multiple items appropriately:

Option 1: Process all items in a single function

{
"externalId": "process-all-items",
"type": "function",
"parameters": {
"function": {
"externalId": "batch-processor",
"data": {
"items": "${workflow.input.items}"
}
}
}
}

Option 2: Use dynamic tasks to process items individually

{
"externalId": "generate-individual-tasks",
"type": "function",
"parameters": {
"function": {
"externalId": "task-generator",
"data": {
"items": "${workflow.input.items}"
}
}
}
},
{
"externalId": "process-individual-items",
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": "${generate-individual-tasks.output.tasks}"
}
},
"dependsOn": ["generate-individual-tasks"]
}

Authenticate a trigger

The trigger requires a nonce for authentication. This is a temporary token that's used for authentication when the data workflow execution starts. A nonce can be retrieved from the Sessions API when creating a session.

Note

Make sure that the nonce has the necessary capabilities to create a workflow run and perform the tasks in the workflow.

Trigger run history

You can retrieve a trigger's run history. This gives detailed information about each run, such as when it happened, which data workflow it successfully started, or why the trigger run failed. Use this information to monitor trigger performance and troubleshoot issues with automated workflow executions.

The run history includes:

  • Fire time: When the trigger has executed.

  • Status: Whether the trigger successfully started a workflow (success) or failed (failed).

  • Workflow execution ID: The ID of the workflow execution that was started (if successful).

  • Reason for failure: Details about why the trigger failed.