Tasks in data workflows
Tasks in data workflows
The section below will explain the configuration required to trigger a task in the workflow.
For more information on the API specifications for tasks in data workflows, see API specifications.
Task definitions are the core components of a workflow definition. The anatomy of a task in a workflow definition would be:
"tasks": [
{
"externalId": "<Task External Id>",
"type": "<Type of task>",
"parameters": {
...
},
"retries": 3,
"timeout": 3600,
"dependsOn": [
...
]
}
...
]
Name | Description |
---|---|
externalId | A unique ID to reference a task within a workflow. |
type | The type of task to be executed: transformation , function , cdf , dynamic , or subworkflow . |
parameters | For each type , a distinct set of parameters is required. |
dependsOn | The list of tasks that should be executed before this task is triggered. It contains an array of task external IDs. |
timeout | The maximum duration a task can run before it is terminated. The default value is 3600 seconds (1 hour). The value is always in seconds. |
retries | The number of retries if the task fails. You can set any value from 0 to 10. The default value is 3. Currently, there are no retries on timeout. |
onFailure | Defines how task failures and timeouts will be handled and helps the user to decide if the completion of a task should be required or optional for the workflow to continue. The default value is abortWorkflow , which will mark the workflow as FAILED if the task fails or times out. Optionally, set the parameter to skipTask , which will mark the task as COMPLETED_WITH_ERRORS if the task fails or times out, and continue the workflow execution. |
Task types
CDF Transformation task
To orchestrate a CDF Transformation task, set the type to transformation
. Under parameters, set the externalId
as the external ID of the CDF Transformation that needs to be triggered.
"type": "transformation",
"parameters": {
"transformation": {
"externalId": "<CDF Transformation externalId>"
"concurrencyPolicy": "fail",
}
}
The optional parameter concurrencyPolicy
determines the behavior of a task if the triggered transformation is already running. The following options are available:
-
fail
: The task fails if another instance of the transformation is currently running. This is the default option. -
waitForCurrent
: The task will pause and wait for the already running transformation to complete. Once the transformation is completed, the task is completed. This mode is useful in preventing redundant transformation runs. -
restartAfterCurrent
: The task waits for the ongoing transformation to complete. After completion, the task restarts the transformation. This mode ensures that the upcoming/following tasks can use the most recent data.
Cognite Function task
To orchestrate a Cognite Function task, set the type to function
. Under parameters, set the externalId
as the external ID of the function that needs to be triggered. Use the data
parameter to provide input to the Function. It accepts a JSON string that will be passed as the data argument to the Function when it’s triggered during the workflow execution.
"type": "function",
"parameters": {
"function": {
"externalId": "<Cognite Function externalId>",
"data": "<JSON object to be passed as input to the Cognite Function>"
}
"isAsyncComplete": false,
}
Asynchronous task completion
A Cognite Function task can trigger a process running in a third-party system or prompt a user to take manual action as part of a workflow. For this type of usage, asynchronous completion of Cognite Function tasks is supported through the isAsyncComplete
parameter.
When isAsyncComplete
parameter is set to false
(default), the workflow will trigger the Cognite Function, and the status of the workflow task will reflect the status of the execution of the function (e.g., it will be marked as COMPLETED
in the workflow when the function has executed and completed successfully).
When isAsyncComplete
parameter is set to true
, the workflow will trigger the Cognite Function, and the task status will remain IN_PROGRESS
until the workflow receives an explicit callback to the update task status endpoint to change the status (to one of COMPLETED
, FAILED
, or FAILED_WITH_TERMINAL_ERROR
).
The example in the diagram illustrates an asynchronous task completion.
Note: The user has to define the required logic in the Cognite Function to trigger the third-party process and the logic in the third-party process to update the task status in the data workflow.
CDF request task
To orchestrate a CDF task, set the type to cdf
. This task type makes authenticated requests to the CDF APIs using the credentials provided when the workflow is executed.
Under parameters, fill in the details of the HTTP request to be made:
- The
resourcePath
, defines the endpoint to be called. TheresourcePath
will be prefixed by{cluster}.cognitedata.com/api/v1/project/{project}
based on the CDF cluster and project. For example, to filter Time series in CDF, theresourcePath
would be/timeseries/list
. - In
queryParameters
, enter any additional parameters that should be part of the CDF query. - The request
method
-POST
,GET
,PUT
,DELETE
, andLIST
. requestTimeoutInMillis
is set in milliseconds. There should be a response after triggering a request to the URL. No response from the URL within the timeout interval indicates that the task fails with the statusTIMED_OUT
.- Specifying the
body
of the request is optional.
Note: If you use a GET
method, you don't require a body
parameter.
"type": "cdf",
"parameters": {
"cdfRequest": {
"resourcePath": "<CDF resource path>",
"queryParameters": "<custom parameters>",
"method": "<HTTP request method>",
"requestTimeoutInMillis": <milliseconds>,
"body": "<body>"
},
}
Dynamic task
When a set of workflow tasks and their execution order are determined at runtime, we use dynamic tasks. It takes the tasks
parameter, which should be an array of valid task definitions.
This array of tasks needs to be determined during the runtime of the workflow and can, therefore, not be included statically in the workflow definition. Instead, we can use a reference, meaning a dynamic value that will be evaluated when the workflow is executed. The dynamic value can either refer to a part of the input to the execution or the output of another task in the workflow.
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": [
<Reference>
]
}
}
The example workflow below illustrates the dynamic task functionality.
- The first task of the workflow is a Cognite Function which generates and outputs an array of task definitions (the Python code executed in the Function returns a dictionary that includes a
tasks
key). - The second task is a Dynamic task, which references and executes the set of tasks defined by the preceding step.
tasks: [
{
"externalId": "first-task",
"type": "function",
"parameters": {
"function": {
"externalId": "my-cognite-function",
}
}
},
{
"externalId": "second-task",
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": "${first-task.output.tasks}",
}
},
"dependsOn": [{"externalId": "first-task"}]
}
]
Subworkflow task
With the subworkflow task you can run a collection of tasks as a workflow within a workflow. This set of tasks can be defined directly in the current workflow definition or be a reference to another workflow.
Use this task type to streamline your workflows by grouping tasks with shared dependencies into a subworkflow or enhance the composability and reusability of your workflows by incorporating subworkflows through references.
List of tasks
Use the tasks
parameter and provide an array of valid task definitions to include a list of tasks directly in the workflow definition.
"type": "subworkflow",
"parameters": {
"subworkflow": {
"tasks": [
...
]
}
}
Reference another workflow
Use the workflowExternalId
and version
parameters to reference another workflow and to specify the workflow version to be embedded as a subworkflow.
"type": "subworkflow",
"parameters": {
"subworkflow": {
"workflowExternalId": "my-other-workflow",
"version": "v1"
}
}
A workflow referenced in a subworkflow task can't contain a subworkflow task (limiting the depth to 1).