Ga verder naar hoofdinhoud

Tasks in data workflows

Tasks trigger the execution of a process, like running transformations or simulations, calling functions, making API requests, or orchestrating other tasks. To trigger tasks in a workflow, you must first define the tasks. The task definitions are core components of workflow definitions and have the following anatomy:

{
"tasks": [
{
"externalId": "<Task external Id>",
"type": "<Type of task>",
"parameters": {
...
},
"retries": 3,
"timeout": 3600,
"dependsOn": [
...
]
}
...
]
}
NameDescription
externalIdA unique ID to reference a task within a workflow.
typeThe type of task to run: transformation, function, simulation, cdf, dynamic, or subworkflow.
parametersFor each type, a distinct set of parameters is required.
dependsOnThe list of tasks to run before this task is triggered. The list is an array of task external IDs.
timeoutThe maximum duration a task can run before it's terminated. The default value is 3600 seconds (1 hour). The value is always in seconds.
retriesThe number of retries if the task fails. You can set any value from 0 to 10. The default value is 3.
onFailureDefines how task failures and timeouts will be handled and helps you decide if task completion should be required or optional for the workflow to continue. The default value is abortWorkflow, which will mark the workflow as FAILED if the task fails or times out. Optionally, set the parameter to skipTask, which will mark the task as COMPLETED_WITH_ERRORS if the task fails or times out, and continue the workflow execution.
tip

See also the API specifications for tasks in data workflows.

Dynamic references

Dynamic references are expressions used to dynamically inject data into task parameters during workflow executions. Define references to create parametric workflows by passing data from workflow inputs or between tasks.

Reference syntax

References must follow the format ${prefix.jsonPath}, where:

  • prefix specifies the data source. This is the workflow input or task output/input.
  • jsonPath uses a JSONPath notation to access nested properties.

Reference types

Reference typeFormatDescription
Workflow input${workflow.input}The entire input object passed to the workflow.
Workflow input property${workflow.input.propertyName}A specific property from the workflow input.
Task output${taskExternalId.output}The entire output object from a completed task.
Task output property${taskExternalId.output.propertyName}A specific property from the output of a completed task.
Task input${taskExternalId.input}The entire input object that was passed to a task.
Task input property${taskExternalId.input.propertyName}A specific property from the input of a task.

JSONPath for nested data

Use a dot notation to access nested properties. This is an example of workflow input data:

{
"data": {
"user": {
"email": "user@example.com",
"preferences": {
"timezone": "UTC"
}
},
"items": ["item1", "item2"]
}
}
  • ${workflow.input.data.user.email}"user@example.com"
  • ${workflow.input.data.user.preferences.timezone}"UTC"
  • ${workflow.input.data.items}["item1", "item2"]

Make workflows parametric

Use references to create reusable workflows that behave differently based on input data:

{
"externalId": "process-data",
"type": "function",
"parameters": {
"function": {
"externalId": "data-processor",
"data": {
"source": "${workflow.input.dataSource}",
"config": "${workflow.input.processingConfig}",
"previous_result": "${prepare-data.output.result}"
}
}
},
"dependsOn": ["prepare-data"]
}

note

Not all task parameters support dynamic references. See the API documentation for specific parameter support.

Task statuses

During workflow execution, each task progresses through different statuses that indicate its current state:

StatusDescription
SCHEDULEDThe task has been scheduled and is waiting to be picked up by an available worker.
IN_PROGRESSThe task is currently being executed by a worker.
COMPLETEDThe task has been successfully completed without any errors.
FAILEDThe task failed to complete due to an error. It may be automatically retried if attempts remain within the configured limit.
FAILED_WITH_TERMINAL_ERRORThe task failed to complete due to an error and cannot be automatically retried.
TIMED_OUTThe task did not complete within its configured time limit and was marked as timed out by the system. Note: Any external process initiated by this task might still be running. Check the target system for its status.
CANCELEDThe task was canceled without being completed because another task failed, and the workflow was configured to stop on failure. Note: Any external process initiated by this task might still be running. Check the target system for the status.
COMPLETED_WITH_ERRORSThe task encountered errors but is considered complete because it was marked as optional in the workflow version. The workflow continued execution despite the errors in this task.
NOT STARTEDThe task is defined in the workflow version but has not yet been reached in the workflow execution. This state is only present in the User Interface of data workflows.

Task types

CDF Transformations tasks

Orchestrate CDF Transformation tasks, such as running SQL transformations to process and aggregate data, schedule data ingestions and clean pipelines, or coordinate multiple transformations with dependencies.

  1. Set type to transformation.

  2. Under parameters, set externalId as the external ID of the CDF Transformation to be triggered.

{
"type": "transformation",
"parameters": {
"transformation": {
"externalId": "<CDF Transformation externalId>"
"concurrencyPolicy": "fail",
}
}
}
  1. The optional parameter concurrencyPolicy determines the behavior of a task if the triggered transformation is already running. The following options are available:
OptionDescription
failThe task fails if another instance of the transformation is currently running. This is the default option.
waitForCurrentThe task will pause and wait for the already running transformation to complete. Once the transformation is completed, the task is completed. This mode is useful in preventing redundant transformation runs.
restartAfterCurrentThe task waits for the ongoing transformation to complete. After completion, the task restarts the transformation. This mode ensures that the upcoming/following tasks can use the most recent data.
useTransformationCredentialsThis is an optional parameter that determines whether the transformation job will use the credentials specified in the transformation or the credentials of the workflow run. The default value is false, meaning the transformation job will use the credentials of the workflow run.

Cognite Functions tasks

Orchestrate Cognite Functions tasks, such as integrating with external APIs and systems for complex calculations, custom data processing logic, or data validation and quality checks.

  1. Set type to function.

  2. Under parameters, set the externalId as the external ID of the function to be triggered.

  3. Use data to provide input to the function. Use a JSON string that will be passed as the data argument to the function when it’s triggered during the workflow execution.

{
"type": "function",
"parameters": {
"function": {
"externalId": "<Cognite Function externalId>",
"data": "<JSON object to be passed as input to the Cognite Function>"
}
"isAsyncComplete": false,
}
}

Asynchronous task completion

A Cognite Function task can trigger a process running in a third-party system or prompt a user to take manual action as part of a workflow. Asynchronous completion of Cognite Function tasks uses the isAsyncComplete parameter:

When isAsyncComplete is set to false (default), the workflow will trigger the function. The status of the workflow task will reflect the status of the execution of the function. For instance, the task's status will be marked as COMPLETED in the workflow when the function has run and completed successfully.

When isAsyncComplete is set to true, the workflow will trigger the function. The status of the workflow task will remain IN_PROGRESS until the workflow receives an explicit callback to the update task status endpoint to change the status to one of COMPLETED, FAILED, or FAILED_WITH_TERMINAL_ERROR.

Note

You must define the required logic in the Cognite Function to trigger the third-party process and the logic in the third-party process to update the task status in the data workflow.

This is an example of an asynchronous task completion:

Asynchronous task completion

Cognite simulation tasks

Orchestrate Cognite simulation tasks, such as engineering simulations for process optimization, predictive maintenance simulations to assess equipment health, what-if scenario analysis, or to integrate simulation results into data pipelines.

  1. Set type to simulation.

  2. Under parameters, set the routineExternalId as the external ID of the simulator routine to be triggered.

  3. Use the optional inputs parameter to override inputs in the routine. It accepts a list of input overrides that will be passed to the routine when it's triggered during workflow execution.

  4. Use the optional runTime parameter to specify when the simulation should run.

{
"type": "simulation",
"parameters": {
"simulation": {
"routineExternalId": "<Cognite simulator routine externalId>",
"inputs": "<List of input overrides>",
"runTime": "<Time at which the simulation should run>"
}
}
}

CDF request task

Orchestrate a CDF task, such as reading data from CDF APIs to use in subsequent tasks, or to create or update CDF resources dynamically.

  1. Set type to cdf. The task type makes authenticated requests to the Cognite APIs using the credentials provided when the workflow is run.

  2. Under parameters, fill in the details of the HTTP request to be made:

  • resourcePath: define the endpoint to be called and is prefixed by {cluster}.cognitedata.com/api/v1/project/{project} based on the CDF cluster and project. For example, to filter time series, the resourcePath would be /timeseries/list.
  • queryParameters: enter any additional parameters that should be part of the CDF query.
  • method: give the request method - POST, GET, PUT and DELETE.
  • requestTimeoutInMillis: is set in milliseconds. There should be a response after triggering a request to the URL. No response from the URL within the timeout interval indicates that the task fails with the status TIMED_OUT.
  • Specifying the body of the request is optional.
Note

If you use a GET method, you don't need the body parameter.

{
"type": "cdf",
"parameters": {
"cdfRequest": {
"resourcePath": "<CDF resource path>",
"queryParameters": "<custom parameters>",
"method": "<HTTP request method>",
"requestTimeoutInMillis": <milliseconds>,
"body": "<body>"
},
}
}

Dynamic tasks

Use dynamic tasks when a set of workflow tasks and their execution order are determined at runtime. For instance, dynamic tasks can be used to process variable numbers of data sources, create workflows based on runtime conditions, handle different processing paths based on data content, or create fan-out patterns where the number of parallel tasks depends on input data.

The task takes the tasks parameter, which must be an array of valid task definitions.

The array of tasks must be determined during the runtime of the workflow and can't be included statically in the workflow definition. Instead, use a reference, meaning a dynamic value that will be evaluated when the workflow is run. The dynamic value can either refer to a part of the input to the execution or the output of another task in the workflow.

{
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": [
<Reference>
]
}
}
}

This is an example workflow with dynamic task functionality.

  • The first task of the workflow is a Cognite Function, which generates and outputs an array of task definitions. The Python code run in the Function returns a dictionary that includes a tasks key.

  • The second task is a Dynamic task, which references and runs the set of tasks defined by the preceding step.

{
tasks: [
{
"externalId": "first-task",
"type": "function",
"parameters": {
"function": {
"externalId": "my-cognite-function",
}
}
},
{
"externalId": "second-task",
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": "${first-task.output.tasks}",
}
},
"dependsOn": [{"externalId": "first-task"}]
}
]
}

Subworkflow task

Use subworkflow tasks to run a collection of tasks as a workflow within a workflow. This streamlines your workflows by grouping tasks with shared dependencies into a subworkflow or enhances the composability and reusability of your workflows by incorporating subworkflows through references. For instance, you can use subworkflow tasks to group related tasks for better organization, create reusable task blocks within the same workflow, logically separate workflow phases, or isolate error handling for specific task groups.

Define these tasks directly in the current workflow definition or reference the tasks to another workflow.

Inline subworkflow (List of tasks)

Use the tasks parameter and provide an array of valid task definitions to include a list of tasks directly in the workflow definition:

{
"type": "subworkflow",
"parameters": {
"subworkflow": {
"tasks": [
...
]
}
}
}

External workflow reference

Use external workflows to reuse existing workflows across multiple parent workflows, create modular workflow architectures, version and maintain complex workflow components separately, and build workflows from tested, proven components

Use the workflowExternalId and version parameters to reference another workflow and to specify the workflow version to be embedded as a subworkflow:

{
"type": "subworkflow",
"parameters": {
"subworkflow": {
"workflowExternalId": "my-other-workflow",
"version": "v1"
}
}
}
note

A workflow referenced in a subworkflow task can't contain a subworkflow task, limiting the depth to 1.