Skip to main content

Use data workflows

Data workflows using the API and Python SDK

To learn and manage Data workflows using API, see API specifications.

For more information about Data workflows using Python, visit Cognite Python SDK documentation.

Run data workflows

You can trigger a workflow execution by invoking the API's /run endpoint.

info

To add the necessary capabilities for data workflows, see assign capabilities.

Authenticate a workflow

In addition to indicating the workflowExternalId and version, it requires a nonce, a temporary token that will be used for authentication when the workflow triggers the processes defined by its tasks. A nonce can be retrieved from the Sessions API when creating a session. The session will be created behind the scenes when triggering a workflow when using the Python SDK.

Workflow input data and metadata

You can provide custom input data to the workflow execution and consume it by the workflow tasks using references. Input data is limited to 100KB in size, meaning it's not intended to pass large amounts of data to the workflow. In such cases, the recommendation is to use the data stores in CDF itself and have the tasks in the workflow read from and write to these.

A workflow execution can also have custom, application-specific metadata.

Schedule a trigger

Currently, workflows do not support scheduled triggers. Use a Cognite Function (with schedules) to schedule workflows effectively; see the following example for reference.

Step 1: Create the Cognite Function that will act as the workflow trigger

You need to specify the client_credentials parameter inside the call to client.workflows.executions.trigger for the authentication to work at runtime.

# Enter credentials and instantiate client
from cognite.client import CogniteClient
cdf_cluster = "" # "api", "greenfield", etc.
cdf_project = "" # CDF project name
tenant_id = "" # IdP tenant ID
client_id = "" # IdP client ID
client_secret = "" # IdP client secret

client = CogniteClient.default_oauth_client_credentials(
cdf_project, cdf_cluster, tenant_id, client_id, client_secret
)

# Define Function handle
def handle(client, data, secrets):
from cognite.client.data_classes import ClientCredentials

execution = client.workflows.executions.trigger(
workflow_external_id=data["workflow-external-id"],
version=data["workflow-version"],
client_credentials=ClientCredentials(
secrets["client-id"], secrets["client-secret"]
),
)

return f"Workflow execution ID: {execution.id}"

# Create the Function
function_name = "workflow_trigger"
client.functions.create(
name=function_name,
external_id=function_name,
function_handle=handle,
secrets={
"client-id": client_id,
"client-secret": client_secret,
"project": cdf_project,
},
)

Step 2: Create a Cognite Function schedule for each scheduled workflow trigger you need

from cognite.client.data_classes import ClientCredentials

# Enter a cron expression for the schedule
cron_expression = "0 12 * * *" # E.g. daily at 12:00. Use https://crontab.guru/ to create a valid expression
# Enter the details of the workflow to schedule
workflow_external_id = "<Enter workflow external ID>"
workflow_version = "<Enter workflow version>"

function_id = client.functions.retrieve(external_id=function_name).id
client.functions.schedules.create(
name=f"Scheduled trigger: {workflow_external_id}",
cron_expression=cron_expression,
function_id=function_id,
client_credentials=ClientCredentials(client_id, client_secret),
data={
"workflow-external-id": workflow_external_id,
"workflow-version": workflow_version,
},
)

Note: Use client credential exchange to generate the session when triggering a Cognite Function workflow.