Tasks in data workflows
Tasks trigger the execution of a process, like running transformations or simulations, calling functions, making API requests, or orchestrating other tasks. To trigger tasks in a workflow, you must first define the tasks. The task definitions are core components of workflow definitions and have the following anatomy:
{
"tasks": [
{
"externalId": "<Task external Id>",
"type": "<Type of task>",
"parameters": {
...
},
"retries": 3,
"timeout": 3600,
"dependsOn": [
...
]
}
...
]
}
Name | Description |
---|---|
externalId | A unique ID to reference a task within a workflow. |
type | The type of task to run: transformation , function , simulation , cdf , dynamic , or subworkflow . |
parameters | For each type , a distinct set of parameters is required. |
dependsOn | The list of tasks to run before this task is triggered. The list is an array of task external IDs. |
timeout | The maximum duration a task can run before it's terminated. The default value is 3600 seconds (1 hour). The value is always in seconds. |
retries | The number of retries if the task fails. You can set any value from 0 to 10. The default value is 3. |
onFailure | Defines how task failures and timeouts will be handled and helps you decide if task completion should be required or optional for the workflow to continue. The default value is abortWorkflow , which will mark the workflow as FAILED if the task fails or times out. Optionally, set the parameter to skipTask , which will mark the task as COMPLETED_WITH_ERRORS if the task fails or times out, and continue the workflow execution. |
See also the API specifications for tasks in data workflows.
Dynamic references
Dynamic references are expressions used to dynamically inject data into task parameters during workflow executions. Define references to create parametric workflows by passing data from workflow inputs or between tasks.
Reference syntax
References must follow the format ${prefix.jsonPath}
, where:
- prefix specifies the data source. This is the workflow input or task output/input.
- jsonPath uses a JSONPath notation to access nested properties.
Reference types
Reference type | Format | Description |
---|---|---|
Workflow input | ${workflow.input} | The entire input object passed to the workflow. |
Workflow input property | ${workflow.input.propertyName} | A specific property from the workflow input. |
Task output | ${taskExternalId.output} | The entire output object from a completed task. |
Task output property | ${taskExternalId.output.propertyName} | A specific property from the output of a completed task. |
Task input | ${taskExternalId.input} | The entire input object that was passed to a task. |
Task input property | ${taskExternalId.input.propertyName} | A specific property from the input of a task. |
JSONPath for nested data
Use a dot notation to access nested properties. This is an example of workflow input data:
{
"data": {
"user": {
"email": "user@example.com",
"preferences": {
"timezone": "UTC"
}
},
"items": ["item1", "item2"]
}
}
${workflow.input.data.user.email}
→"user@example.com"
${workflow.input.data.user.preferences.timezone}
→"UTC"
${workflow.input.data.items}
→["item1", "item2"]
Make workflows parametric
Use references to create reusable workflows that behave differently based on input data:
{
"externalId": "process-data",
"type": "function",
"parameters": {
"function": {
"externalId": "data-processor",
"data": {
"source": "${workflow.input.dataSource}",
"config": "${workflow.input.processingConfig}",
"previous_result": "${prepare-data.output.result}"
}
}
},
"dependsOn": ["prepare-data"]
}
Not all task parameters support dynamic references. See the API documentation for specific parameter support.
Task statuses
During workflow execution, each task progresses through different statuses that indicate its current state:
Status | Description |
---|---|
SCHEDULED | The task has been scheduled and is waiting to be picked up by an available worker. |
IN_PROGRESS | The task is currently being executed by a worker. |
COMPLETED | The task has been successfully completed without any errors. |
FAILED | The task failed to complete due to an error. It may be automatically retried if attempts remain within the configured limit. |
FAILED_WITH_TERMINAL_ERROR | The task failed to complete due to an error and cannot be automatically retried. |
TIMED_OUT | The task did not complete within its configured time limit and was marked as timed out by the system. Note: Any external process initiated by this task might still be running. Check the target system for its status. |
CANCELED | The task was canceled without being completed because another task failed, and the workflow was configured to stop on failure. Note: Any external process initiated by this task might still be running. Check the target system for the status. |
COMPLETED_WITH_ERRORS | The task encountered errors but is considered complete because it was marked as optional in the workflow version. The workflow continued execution despite the errors in this task. |
NOT STARTED | The task is defined in the workflow version but has not yet been reached in the workflow execution. This state is only present in the User Interface of data workflows. |
Task types
CDF Transformations tasks
Orchestrate CDF Transformation tasks, such as running SQL transformations to process and aggregate data, schedule data ingestions and clean pipelines, or coordinate multiple transformations with dependencies.
-
Set type to
transformation
. -
Under parameters, set
externalId
as the external ID of the CDF Transformation to be triggered.
{
"type": "transformation",
"parameters": {
"transformation": {
"externalId": "<CDF Transformation externalId>"
"concurrencyPolicy": "fail",
}
}
}
- The optional parameter
concurrencyPolicy
determines the behavior of a task if the triggered transformation is already running. The following options are available:
Option | Description |
---|---|
fail | The task fails if another instance of the transformation is currently running. This is the default option. |
waitForCurrent | The task will pause and wait for the already running transformation to complete. Once the transformation is completed, the task is completed. This mode is useful in preventing redundant transformation runs. |
restartAfterCurrent | The task waits for the ongoing transformation to complete. After completion, the task restarts the transformation. This mode ensures that the upcoming/following tasks can use the most recent data. |
useTransformationCredentials | This is an optional parameter that determines whether the transformation job will use the credentials specified in the transformation or the credentials of the workflow run. The default value is false , meaning the transformation job will use the credentials of the workflow run. |
Cognite Functions tasks
Orchestrate Cognite Functions tasks, such as integrating with external APIs and systems for complex calculations, custom data processing logic, or data validation and quality checks.
-
Set type to
function
. -
Under parameters, set the
externalId
as the external ID of the function to be triggered. -
Use data to provide input to the function. Use a JSON string that will be passed as the data argument to the function when it’s triggered during the workflow execution.
{
"type": "function",
"parameters": {
"function": {
"externalId": "<Cognite Function externalId>",
"data": "<JSON object to be passed as input to the Cognite Function>"
}
"isAsyncComplete": false,
}
}
Asynchronous task completion
A Cognite Function task can trigger a process running in a third-party system or prompt a user to take manual action as part of a workflow. Asynchronous completion of Cognite Function tasks uses the isAsyncComplete
parameter:
When isAsyncComplete
is set to false
(default), the workflow will trigger the function. The status of the workflow task will reflect the status of the execution of the function. For instance, the task's status will be marked as COMPLETED
in the workflow when the function has run and completed successfully.
When isAsyncComplete
is set to true
, the workflow will trigger the function. The status of the workflow task will remain IN_PROGRESS
until the workflow receives an explicit callback to the update task status endpoint to change the status to one of COMPLETED
, FAILED
, or FAILED_WITH_TERMINAL_ERROR
.
You must define the required logic in the Cognite Function to trigger the third-party process and the logic in the third-party process to update the task status in the data workflow.
This is an example of an asynchronous task completion:

Cognite simulation tasks
Orchestrate Cognite simulation tasks, such as engineering simulations for process optimization, predictive maintenance simulations to assess equipment health, what-if scenario analysis, or to integrate simulation results into data pipelines.
-
Set type to
simulation
. -
Under parameters, set the
routineExternalId
as the external ID of the simulator routine to be triggered. -
Use the optional
inputs
parameter to override inputs in the routine. It accepts a list of input overrides that will be passed to the routine when it's triggered during workflow execution. -
Use the optional
runTime
parameter to specify when the simulation should run.
{
"type": "simulation",
"parameters": {
"simulation": {
"routineExternalId": "<Cognite simulator routine externalId>",
"inputs": "<List of input overrides>",
"runTime": "<Time at which the simulation should run>"
}
}
}
CDF request task
Orchestrate a CDF task, such as reading data from CDF APIs to use in subsequent tasks, or to create or update CDF resources dynamically.
-
Set type to
cdf
. The task type makes authenticated requests to the Cognite APIs using the credentials provided when the workflow is run. -
Under parameters, fill in the details of the HTTP request to be made:
resourcePath
: define the endpoint to be called and is prefixed by{cluster}.cognitedata.com/api/v1/project/{project}
based on the CDF cluster and project. For example, to filter time series, theresourcePath
would be/timeseries/list
.queryParameters
: enter any additional parameters that should be part of the CDF query.method
: give the requestmethod
-POST
,GET
,PUT
andDELETE
.requestTimeoutInMillis
: is set in milliseconds. There should be a response after triggering a request to the URL. No response from the URL within the timeout interval indicates that the task fails with the statusTIMED_OUT
.- Specifying the
body
of the request is optional.
If you use a GET
method, you don't need the body
parameter.
{
"type": "cdf",
"parameters": {
"cdfRequest": {
"resourcePath": "<CDF resource path>",
"queryParameters": "<custom parameters>",
"method": "<HTTP request method>",
"requestTimeoutInMillis": <milliseconds>,
"body": "<body>"
},
}
}
Dynamic tasks
Use dynamic tasks when a set of workflow tasks and their execution order are determined at runtime. For instance, dynamic tasks can be used to process variable numbers of data sources, create workflows based on runtime conditions, handle different processing paths based on data content, or create fan-out patterns where the number of parallel tasks depends on input data.
The task takes the tasks
parameter, which must be an array of valid task definitions.
The array of tasks must be determined during the runtime of the workflow and can't be included statically in the workflow definition. Instead, use a reference, meaning a dynamic value that will be evaluated when the workflow is run. The dynamic value can either refer to a part of the input to the execution or the output of another task in the workflow.
{
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": [
<Reference>
]
}
}
}
This is an example workflow with dynamic task functionality.
-
The first task of the workflow is a Cognite Function, which generates and outputs an array of task definitions. The Python code run in the Function returns a dictionary that includes a
tasks
key. -
The second task is a Dynamic task, which references and runs the set of tasks defined by the preceding step.
{
tasks: [
{
"externalId": "first-task",
"type": "function",
"parameters": {
"function": {
"externalId": "my-cognite-function",
}
}
},
{
"externalId": "second-task",
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": "${first-task.output.tasks}",
}
},
"dependsOn": [{"externalId": "first-task"}]
}
]
}
Subworkflow task
Use subworkflow tasks to run a collection of tasks as a workflow within a workflow. This streamlines your workflows by grouping tasks with shared dependencies into a subworkflow or enhances the composability and reusability of your workflows by incorporating subworkflows through references. For instance, you can use subworkflow tasks to group related tasks for better organization, create reusable task blocks within the same workflow, logically separate workflow phases, or isolate error handling for specific task groups.
Define these tasks directly in the current workflow definition or reference the tasks to another workflow.
Inline subworkflow (List of tasks)
Use the tasks
parameter and provide an array of valid task definitions to include a list of tasks directly in the workflow definition:
{
"type": "subworkflow",
"parameters": {
"subworkflow": {
"tasks": [
...
]
}
}
}
External workflow reference
Use external workflows to reuse existing workflows across multiple parent workflows, create modular workflow architectures, version and maintain complex workflow components separately, and build workflows from tested, proven components
Use the workflowExternalId
and version
parameters to reference another workflow and to specify the workflow version to be embedded as a subworkflow:
{
"type": "subworkflow",
"parameters": {
"subworkflow": {
"workflowExternalId": "my-other-workflow",
"version": "v1"
}
}
}
A workflow referenced in a subworkflow task can't contain a subworkflow task, limiting the depth to 1.