Passer au contenu principal

Tasks in data workflows

Tasks in data workflows

The section below will explain the configuration required to trigger a task in the workflow.

conseil

For more information on the API specifications for tasks in data workflows, see API specifications.

Task definitions are the core components of a workflow definition. The anatomy of a task in a workflow definition would be:

"tasks": [
{
"externalId": "<Task External Id>",
"type": "<Type of task>",
"parameters": {
...
},
"retries": 3,
"timeout": 3600,
"dependsOn": [
...
]
}
...
]
NameDescription
externalIdA unique ID to reference a task within a workflow.
typeThe type of task to run: transformation, function, cdf, dynamic, or subworkflow.
parametersFor each type, a distinct set of parameters is required.
dependsOnThe list of tasks to run before this task is triggered. It has an array of task external IDs.
timeoutThe maximum duration a task can run before it's terminated. The default value is 3600 seconds (1 hour). The value is always in seconds.
retriesThe number of retries if the task fails. You can set any value from 0 to 10. The default value is 3.
onFailureDefines how task failures and timeouts will be handled and helps the user to decide if the completion of a task should be required or optional for the workflow to continue. The default value is abortWorkflow, which will mark the workflow as FAILED if the task fails or times out. Optionally, set the parameter to skipTask, which will mark the task as COMPLETED_WITH_ERRORS if the task fails or times out, and continue the workflow execution.

Task types

CDF Transformation task

To orchestrate a CDF Transformation task, set the type to transformation. Under parameters, set the externalId as the external ID of the CDF Transformation that needs to be triggered.

"type": "transformation",
"parameters": {
"transformation": {
"externalId": "<CDF Transformation externalId>"
"concurrencyPolicy": "fail",
}
}

The optional parameter concurrencyPolicy determines the behavior of a task if the triggered transformation is already running. The following options are available:

  • fail: The task fails if another instance of the transformation is currently running. This is the default option.

  • waitForCurrent: The task will pause and wait for the already running transformation to complete. Once the transformation is completed, the task is completed. This mode is useful in preventing redundant transformation runs.

  • restartAfterCurrent: The task waits for the ongoing transformation to complete. After completion, the task restarts the transformation. This mode ensures that the upcoming/following tasks can use the most recent data.

Cognite Function task

To orchestrate a Cognite Function task, set the type to function. Under parameters, set the externalId as the external ID of the function that needs to be triggered. Use the data parameter to provide input to the Function. It accepts a JSON string that will be passed as the data argument to the Function when it’s triggered during the workflow execution.

"type": "function",
"parameters": {
"function": {
"externalId": "<Cognite Function externalId>",
"data": "<JSON object to be passed as input to the Cognite Function>"
}
"isAsyncComplete": false,
}

Asynchronous task completion

A Cognite Function task can trigger a process running in a third-party system or prompt a user to take manual action as part of a workflow. For this type of usage, asynchronous completion of Cognite Function tasks is supported through the isAsyncComplete parameter.

When isAsyncComplete parameter is set to false(default), the workflow will trigger the Cognite Function, and the status of the workflow task will reflect the status of the execution of the function (e.g., it will be marked as COMPLETED in the workflow when the function has been run and completed successfully).

When isAsyncComplete parameter is set to true, the workflow will trigger the Cognite Function, and the task status will remain IN_PROGRESS until the workflow receives an explicit callback to the update task status endpoint to change the status (to one of COMPLETED, FAILED, or FAILED_WITH_TERMINAL_ERROR).

The example in the diagram illustrates an asynchronous task completion.

Note: The user has to define the required logic in the Cognite Function to trigger the third-party process and the logic in the third-party process to update the task status in the data workflow.

Asynchronous task completion

Cognite simulation task

To orchestrate a Cognite simulation task, set the type to simulation. Under parameters, set the routineExternalId as the external ID of the simulator routine that needs to be triggered. Use the inputs parameter to override inputs in the routine as it accepts a list of input overrides that will be passed to the routine when it's triggered during workflow execution. Use the runTime parameter to specify when the simulation should run.

"type": "simulation",
"parameters": {
"simulation": {
"routineExternalId": "<Cognite simulator routine externalId>",
"inputs": "<List of input overrides>",
"runTime": "<Time at which the simulation should run>"
}
}

CDF request task

To orchestrate a CDF task, set the type to cdf. This task type makes authenticated requests to the CDF APIs using the credentials provided when the workflow is run.

Under parameters, fill in the details of the HTTP request to be made:

  • The resourcePath, defines the endpoint to be called. The resourcePath will be prefixed by {cluster}.cognitedata.com/api/v1/project/{project} based on the CDF cluster and project. For example, to filter Time series in CDF, the resourcePath would be /timeseries/list.
  • In queryParameters, enter any additional parameters that should be part of the CDF query.
  • The request method - POST, GET, PUT, DELETE, and LIST.
  • requestTimeoutInMillis is set in milliseconds. There should be a response after triggering a request to the URL. No response from the URL within the timeout interval indicates that the task fails with the status TIMED_OUT.
  • Specifying the body of the request is optional.

Note: If you use a GET method, you don't require a body parameter.

"type": "cdf",
"parameters": {
"cdfRequest": {
"resourcePath": "<CDF resource path>",
"queryParameters": "<custom parameters>",
"method": "<HTTP request method>",
"requestTimeoutInMillis": <milliseconds>,
"body": "<body>"
},
}

Dynamic task

When a set of workflow tasks and their execution order are determined at runtime, we use dynamic tasks. It takes the tasks parameter, which should be an array of valid task definitions.

This array of tasks needs to be determined during the runtime of the workflow and can, therefore, not be included statically in the workflow definition. Instead, we can use a reference, meaning a dynamic value that will be evaluated when the workflow is run. The dynamic value can either refer to a part of the input to the execution or the output of another task in the workflow.

"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": [
<Reference>
]
}
}

The example workflow below illustrates the dynamic task functionality.

  1. The first task of the workflow is a Cognite Function which generates and outputs an array of task definitions (the Python code run in the Function returns a dictionary that includes a tasks key).
  2. The second task is a Dynamic task, which references and runs the set of tasks defined by the preceding step.
tasks: [
{
"externalId": "first-task",
"type": "function",
"parameters": {
"function": {
"externalId": "my-cognite-function",
}
}
},
{
"externalId": "second-task",
"type": "dynamic",
"parameters": {
"dynamic": {
"tasks": "${first-task.output.tasks}",
}
},
"dependsOn": [{"externalId": "first-task"}]
}
]

Subworkflow task

With the subworkflow task you can run a collection of tasks as a workflow within a workflow. This set of tasks can be defined directly in the current workflow definition or be a reference to another workflow.

Use this task type to streamline your workflows by grouping tasks with shared dependencies into a subworkflow or enhance the composability and reusability of your workflows by incorporating subworkflows through references.

List of tasks

Use the tasks parameter and provide an array of valid task definitions to include a list of tasks directly in the workflow definition.

"type": "subworkflow",
"parameters": {
"subworkflow": {
"tasks": [
...
]
}
}

Reference another workflow

Use the workflowExternalId and version parameters to reference another workflow and to specify the workflow version to be embedded as a subworkflow.

"type": "subworkflow",
"parameters": {
"subworkflow": {
"workflowExternalId": "my-other-workflow",
"version": "v1"
}
}
note

A workflow referenced in a subworkflow task can't contain a subworkflow task (limiting the depth to 1).