Skip to main content

Tasks in data workflows

Tasks in data workflows

The section below will explain the configuration required to trigger a task in the workflow.

tip

For more information on the API specifications for tasks in data workflows, see API specifications.

Task definitions are the core components of a workflow definition. The anatomy of a task in a workflow definition would be:

"tasks": [
{
"externalId": "<Task External Id>",
"type": "<Type of task>",
"parameters": {
...
},
"retries": 3,
"timeout": 3600,
"dependsOn": [
...
]
}
...
]
NameDescription
externalIdA unique ID to reference a task within a workflow.
typeThe type of task to be executed: transformation, function, cdf, dynamic, or subworkflow.
parametersFor each type, a distinct set of parameters is required.
dependsOnThe list of tasks that should be executed before this task is triggered. It contains an array of task external IDs.
timeoutThe maximum duration a task can run before it is terminated. The default value is 3600 seconds (1 hour). The value is always in seconds.
retriesThe number of retries if the task fails. You can set any value from 0 to 10. The default value is 3. Currently, there are no retries on timeout.
onFailureDefines how task failures and timeouts will be handled and helps the user to decide if the completion of a task should be required or optional for the workflow to continue. The default value is abortWorkflow, which will mark the workflow as FAILED if the task fails or times out. Optionally, set the parameter to skipTask, which will mark the task as COMPLETED_WITH_ERRORS if the task fails or times out, and continue the workflow execution.

Task types

CDF Transformation task

To orchestrate a CDF Transformation task, set the type to transformation. Under parameters, set the externalId as the external ID of the CDF Transformation that needs to be triggered.

"type": "transformation",
"parameters": {
"transformation": {
"externalId": "<CDF Transformation externalId>"
"concurrencyPolicy": "fail",
}
}

The optional parameter concurrencyPolicy determines the behavior of a task if the triggered transformation is already running. The following options are available:

  • fail: The task fails if another instance of the transformation is currently running. This is the default option.

  • waitForCurrent: The task will pause and wait for the already running transformation to complete. Once the transformation is completed, the task is completed. This mode is useful in preventing redundant transformation runs.

  • restartAfterCurrent: The task waits for the ongoing transformation to complete. After completion, the task restarts the transformation. This mode ensures that the upcoming/following tasks can use the most recent data.

Cognite Function task

To orchestrate a Cognite Function task, set the type to function. Under parameters, set the externalId as the external ID of the function that needs to be triggered. Use the data parameter to provide input to the Function. It accepts a JSON string that will be passed as the data argument to the Function when it’s triggered during the workflow execution.

"type": "function",
"parameters": {
"function": {
"externalId": "<Cognite Function externalId>",
"data": "<JSON object to be passed as input to the Cognite Function>"
}
"isAsyncComplete": false,
}

Asynchronous task completion

A Cognite Function task can trigger a process running in a third-party system or prompt a user to take manual action as part of a workflow. For this type of usage, asynchronous completion of Cognite Function tasks is supported through the isAsyncComplete parameter.

When isAsyncComplete parameter is set to false(default), the workflow will trigger the Cognite Function, and the status of the workflow task will reflect the status of the execution of the function (e.g., it will be marked as COMPLETED in the workflow when the function has executed and completed successfully).

When isAsyncComplete parameter is set to true, the workflow will trigger the Cognite Function, and the task status will remain IN_PROGRESS until the workflow receives an explicit callback to the update task status endpoint to change the status (to one of COMPLETED, FAILED, or FAILED_WITH_TERMINAL_ERROR).

The example in the diagram illustrates an asynchronous task completion.

Note: The user has to define the required logic in the Cognite Function to trigger the third-party process and the logic in the third-party process to update the task status in the data workflow.

Asynchronous task completion

CDF request task

To orchestrate a CDF task, set the type to cdf. This task type makes authenticated requests to the CDF APIs using the credentials provided when the workflow is executed.

Under parameters, fill in the details of the HTTP request to be made:

  • The resourcePath, defines the endpoint to be called. The resourcePath will be prefixed by {cluster}.cognitedata.com/api/v1/project/{project} based on the CDF cluster and project. For example, to filter Time series in CDF, the resourcePath would be /timeseries/list.
  • In queryParameters, enter any additional parameters that should be part of the CDF query.
  • The request method - POST, GET, PUT, DELETE, and LIST.
  • requestTimeoutInMillis is set in milliseconds. There should be a response after triggering a request to the URL. No response from the URL within the timeout interval indicates that the task fails with the status TIMED_OUT.
  • Specifying the body of the request is optional.

Note: If you use a GET method, you don't require a body parameter.

"type": "cdf",
"parameters": {
"cdfRequest": {
"resourcePath": "<CDF resource path>",
"queryParameters": "<custom parameters>",
"method": "<HTTP request method>",
"requestTimeoutInMillis": <milliseconds>,
"body": "<body>"
},
}

Dynamic task

When a set of workflow tasks and their execution order are determined at runtime, we use dynamic tasks. It takes the tasks parameter, which should be an array of valid task definitions.

This array of tasks needs to be determined during the runtime of the workflow and can, therefore, not be included statically in the workflow definition. Instead, we can use a reference, meaning a dynamic value that will be evaluated when the workflow is executed. The dynamic value can either refer to a part of the input to the execution or the output of another task in the workflow.

"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": [
<Reference>
]
}
}

The example workflow below illustrates the dynamic task functionality.

  1. The first task of the workflow is a Cognite Function which generates and outputs an array of task definitions (the Python code executed in the Function returns a dictionary that includes a tasks key).
  2. The second task is a Dynamic task, which references and executes the set of tasks defined by the preceding step.
tasks: [
{
"externalId": "first-task",
"type": "function"
"parameters": {
"function": {
"externalId": "my-cognite-function",
}
}
},
{
"externalId": "second-task",
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": "${first-task.output.tasks}",
}
},
"dependsOn": ["first-task"]
}
]

Subworkflow task

The subworkflow task type is similar to the dynamic task type, but instead of generating the array of tasks dynamically at runtime, it is included statically in the workflow definition. Use the subworkflow task type to simplify the workflow definition significantly if you have many individual tasks that should be executed in parallel and have common upstream and downstream dependencies.

It takes the tasks parameter, which should be an array of valid task definitions.

"type": "subworkflow",
"parameters": {
"subworkflow": {
"tasks": [
...
]
}
}

Other concepts

Task input and output

Each task will receive the task definition as input during execution. Depending on the type, a task returns a different set of outputs.

  • Transformation tasks return the jobId of the transformation job in CDF.

  • Function tasks return the callId and id of the Function in CDF and its response. See Functions for more information.

  • CDF tasks return the statusCode and response (body).

caution

Secrets as input to workflows and tasks Some tasks in a workflow may require access to secrets or other confidential information, such as client credentials, during execution. Data workflows currently do not support secure storage of such secrets as input to workflows or tasks. For such cases, the recommended solution is to leverage the capabilities for secure storage of secrets in Cognite Functions, combined with a Function task in the workflow. For more information, see Functions.

References

When specifying task properties in the workflow definition, you can use static values (strings) or references (expressions). A Reference is an expression that dynamically injects input to a task during execution. Use references to reference the workflow's input or the input or output of a previous task in the workflow.

Note: The injected value must be valid in the context of the parameter the reference is used for.

References must adhere to the format ${prefix.jsonPath}, a JSON Path preceded by a prefix. The valid prefixes are:

  • <taskExternalId>.output
  • <taskExternalId>.input
  • workflow.input

The jsonPath refers to the path of a key in the JSON object defined by the prefix.

For instance, a task output reference could look like ${myTaskExternalId.output.someKey}, or a workflow input reference like this ${workflow.input.myKey}.