Read from a CDF staging table
To select data from a CDF staging table, use the syntaxmydb.mytable:
`my-db`.`my table`.
Avoid schema inference
Transformations infer schemas in the CDF staging table, but this process only uses a subset of all the rows in the table. You can avoid schema inference and write a schema fitted your data. To avoid schema inference:key:STRING, lastUpdatedTime:TIMESTAMP, columns:STRING, where the columns string contains the JSON value encoded as a string.
Here’s an example of how to enforce a user-defined schema:
Read from a CDF file
To read data from a file uploaded to the CDF Files API, use the syntax below:- Format must be
JSON Lines(also calledNDJSON). - Size must be below 5GB.
- File must be
utf-8encoded.
Duplicate rows in the file are removed when processed by Transformations.
Avoid schema inference
To avoid schema inference, use the optionalschema-inference parameter (set to true by default):
value: STRING, where the value string contains the JSON value encoded as a string.
For example, use the user-defined schema below:
Read from other CDF resource types
To select other CDF resource types, use the syntax_cdf.resource_type.
_cdf.events_cdf.assets_cdf.files_cdf.timeseries_cdf.sequences_cdf_sequences.<sequence_externalId>_cdf.datapoints_cdf.stringdatapoints_cdf.labels_cdf.relationships
Load data incrementally
When reading data, your transformation jobs will run much faster and more efficiently if it only has to read the data that has changed since the last time the transformation job ran. This reduces the time to run a transformation, and allows for more frequent runs. One way to achieve this, is to filter on thelastUpdatedTime column to query for the rows that have changed after a specific timestamp. The filter on lastUpdatedTime is pushed to the underlying resource type (if supported) to run the query more efficiently and only return changes.
There are some minor syntax differences between some of the resource types for this filtering, but for example, when reading from staging tables it could look like this:
select * from mydb.mytable where lastUpdatedTime > to_timestamp(123456).
Instead of encoding the timestamp in the query and keeping it up to date every time new data has been processed, you can use the is_new function to do this for you automatically. The function returns true when a row has changed since the last time the transformation was run and false otherwise. This filters out older rows before the results are processed.
The first time you run a transformation using the query below, all the rows of mytable will be processed:
is_new filters the same rows the next time the transformation is run. This ensures that there is no data loss in the transformation from source to destination.
Incremental load is disabled when previewing query results. That is,
is_new will always return true for all rows.is_new filter is identified by a name and can be set to any constant string (for example,"mydb_mytable" in the above example). This allows you to differentiate between multiple calls to is_new in the same query and use is_new to filter on multiple tables. To easily identify the different filters, we recommend that you use the name of the table as the name of the is_new filter. The name is stored with the transformation and must be unique for the specific transformation. If you use the same name in two different transformations, they’re stored separately to not interfere with each other.
It’s not common to use multiple
is_new filters in the same query. Instead, it’s more likely you’ll use is_new on the main resource you’re accessing. Then, you can join in different resources with data to improve any new entries from the main table or resource. If you use multiple is_new filters, they are applied to each source separately before any join operations are evaluated. This means that for the join to work correctly in this case, both sources have to be updated at the same time.Resource types supporting incremental data loading on the lastUpdatedTime column
Incremental data loading is supported by filtering on lastUpdatedTime for the following resource types in addition to staging:
_cdf.assets_cdf.events_cdf.files_cdf.timeseries_cdf.relationships_cdf.datasets
Incremental data loading when using Data Modeling
For data modeling, we don’t recommend filtering ontimestamp or int64 columns. Instead, it is more efficient to use the variant of the is_new function that uses the sync API to read all changes since the last time the transformation was successfully run. This is_new function is used when it references the cdf_nodes(), cdf_edges() or cdf_data_models() functions instead of a single column like lastUpdatedTime.
This could look like this:
is_new will filter on the output of cdf_nodes.
Each is_new filter is identified by a name and can be set to any constant string (for example,"my_nodes" in the above example).
If you have multiple sources in the same query, you must specify which source the is_new is referencing. This is done by providing an alias on the source function, like this:
cdf_nodes() function, and then specifies to apply the is_new filter on this alias. This is different than how is_new is used for other resource types, where the specification is to a specific column in the source.
The source can be any of the cdf_nodes, cdf_edges or cdf_data_models functions, and can reference a specific view, such as:
is_new translates the query to filter on a cursor that tracks all changes. The cursor is updated every time the transformation is successfully run, in the same way as is_new for other resource types. You don’t need to explicitly model support for this filtering in your data model, as it is inherently supported in data modeling. You can also combine this with other filters (where clauses), and it will use any matching indexes set up in data modeling to ensure performance of any optional filters.
Backfill
To process all the data even if it hasn’t changed since the last transformation, change the name of theis_new filter, for example, by adding a postfix with an incrementing number (e.g. "mydb_mytable_1").
This is especially useful when the logic of the query changes and data that has already been imported needs to be updated accordingly.
Write to specific properties in data modeling
In data modeling, a type node can represent anything from physical entities to abstract concepts like a comment or the type of a physical entity. Every instance (nodes and edges) in data modeling has a type property. This property is a direct relation pointing to the node that defines its intended type. To populate the type attribute for instances, use the _type keyword in your transformation SQL statement. The example below uses the_type column to read, write, and filter instances.
_type is a property of the instance and isn’t associated with any view. You can name a view property “type”, and it can be referenced using the type keyword.
For more information, see Type nodes in data modeling.
Custom SQL functions
In addition to the built-in Spark SQL functions, we also provide a set of custom SQL functions to help you write efficient transformations.When a function expects
var_args, it allows a variable number of arguments of any type, including star *.get_names
- get_names(var_args): Array[String]
cast_to_strings
- cast_to_strings(var_args): Array[String]
to_metadata
- to_metadata(var_args): Map[String, String]
map_from_arrays(get_names(var_args), cast_to_strings(var_args)). Use this function when you want to transform your columns or structures into a format that fits the metadata field in CDF.
Example
to_metadata_except
- to_metadata_except(excludeFilter: Array[String], var_args)
Map[String, String]) where strings found in excludeFilter will exclude keys from var_args.
Use this function when you want to put most, but not all, columns into metadata, for example to_metadata_except(array("someColumnToExclude"), *)
Example
asset_ids
Attempts to find asset names under the given criteria and return the IDs of the matching assets. Three variations are available. Attempts to find givenassetNames in all assets.
- asset_ids(assetNames: Array[String]): Array[BigInt]
assetNames in the asset hierarchy with rootAssetName as their root asset.
- asset_ids(assetNames: Array[String], rootAssetName: String): Array[BigInt]
assetNames that belong to the datasetIds.
- asset_ids(assetNames: Array[String], datasetIds: Array[Long]): Array[BigInt]
assetNames that belong to the datasetIds under the rootAssetName.
- asset_ids(assetNames: Array[String], rootAssetName: String, datasetIds: Array[Long]): Array[BigInt]
is_new
- is_new(name: String, version: long)
true if the version provided is higher than the version found with the specified name, based on the last time the transformation was run. version can be any column of dataype long with only incremental values ingested. A popular example is the lastUpdatedTime column.
- If the transformation completes successfully, the next transformation job only processes rows that have changed since the start of the last successfully completed transformation job.
-
If the transformation fails,
is_newprocesses all rows that have changed since the start of the last successful run. This ensures no data loss in the transformation from source to destination. See also Load data incrementally.
dataset_id
- dataset_id(externalId: String): BigInt
id of the given data set by externalId and returns the id if the externalId exists.
Example
cdf_assetSubtree
- cdf_assetSubtree(externalId: String or id: BigInt): Table[Asset]
cdf_nodes
- cdf_nodes(space of the view: String, externalId of the view: String, version of the view: String): Table[Nodes]
- cdf_nodes(): Table[Nodes]
cdf_nodes()returnsspaceandexternalIdof all nodes in the CDF project.cdf_nodes("space of the view: String", "externalId of the view: String"," version of the view: String")returns a table with nodes ingested withviewas reference.
The table containsspaceandexternalIdcolumns and columns for each property in theview.
cdf_edges
- cdf_edges(“space of the view: String”, “externalId of the view: String”, “version of the view: String”): Table[Edges]
- cdf_edges(): Table[Edges]
-
cdf_edges()returnsspace,externalId,startNode,endNode, andtypeof all edges in a CDF project. -
cdf_edges(space of the view: String, externalId of the view: String, version of the view: String)returns a table with edges ingested withviewas reference.
The table containsspace,externalId,startNode,endNode, andtypecolumns and columns for each property in theview.
node_reference
- node_reference(“space: String”, “externalId: String”): STRUCT<“space:string”, “externalId:string”>
- node_reference(“externalId: String”): STRUCT<“space:String”, “externalId:String”>
node, you need the space externalId of the node and the node externalId. Typically, you reference a node when writing or filtering edges based on startNode and endNode.
node_reference accepts the single parameter externalId of the node. The target/instance space set at the transformation is used as the space externalId of the node.
Example
type_reference
- type_reference(“space: String”, “externalId: String”): STRUCT<“space:String”, “externalId:String”>
- type_reference(“externalId: String”): STRUCT<“space:String”, “externalId:String”>
type. To filter edges based on type, use type_reference and provide the space externalId and the edge type externalId. If you’re writing edges with a view reference, you must specify the edge type using type_reference.
type_reference accepts the single parameter externalId of the edge type. The target/instance space set at the transformation is used as the space externalId of the edge type.
Example
cdf_data_models
- cdf_data_models(“data model space: String”, “data model externalId: String”, “data model version: String”, “type external id: String” ): Table[Nodes]
- cdf_data_models(“data model space: String”, “data model externalId: String”, “data model version: String”, “type external id: String”, “property in type containing the relationship: String” ): Table[Edges]
types and relationship.
To retrieve data from a type in your data model, provide the data model’s space, externalId, version and the externalId of the type as input parameters to cdf_data_models.
To retrieve data from a relationship in your data model, provide the data model’s space, externalId, version,the externalId of the type containing the relationship and the name of the relationship property in the type as input parameters to cdf_data_models.
Example
try_get_unit
- try_get_unit(“unit alias: String”, “quantity: String”): String
- try_get_unit(“unit alias: String”): String
externalId as defined by the Cognite unit catalog based on an alias and an optional quantity. Each unit in the catalog is associated with a quantity and a list of alias names. For instance, degrees Celsius has externalId temperature:deg_c, is of quantity temperature, and has a list of alias names that includes deg_c, ºC and Celsius.
Inside a quantity, the unit alias is unique. If the unit alias doesn’t exist for the quantity, the function returns null.
If the quantity isn’t specified, the function will only return a value if the unit alias is unique across all quantities.
Otherwise, the function will return null.
Example
temperature:deg_c.
convert_unit
- convert_unit(“value: Number”, “source unit external id: String”, “target unit external id: String”): Double
- convert_unit(“value: Number”, “source unit alias: String”, “target unit alias: String”, “quantity: String”): Double
- convert_unit(“value: Number”, “source unit alias: String”, “target unit alias: String”): Double
value between units of the same quantity.
If the value is null, the function will return null.
The source and target units to convert between can be specified using the externalId or unit alias of each unit.
The quantity can also be specified to verify that the alias and external IDs refer to the right quantity. The function will return an error if the quantity and the aliases don’t match.
In that case, the method could fail if there is no unit with such unit alias.
When using aliases without specifying the quantity, there could be ambiguous aliases.
In these cases, the method will try to resolve the ambiguity by using aliases which have a quantity in common, therefore eliminating the ambiguity whenever possible.
If the externalId or unit alias don’t have a quantity in common, it’s impossible to convert the value between them. Therefore, the conversion will fail.
If the units don’t exist, the conversion will fail.
Example
50.0.
Notice in the last example that “F” is an ambiguous alias as it could refer to either Fahrenheit degrees or Farad. In this case, the convert_unit method will automatically select Fahrenheit degrees because it is a unit of the quantity temperature and so are Celsius degrees. Therefore, the method will succeed and convert between Celsius degrees and Fahrenheit degrees as expected.
Disabled Spark SQL functions
We currently don’t support using these Spark SQL functions when you transform data:xpath
xpath_boolean
xpath_double
xpath_float
xpath_int
xpath_number
xpath_short
xpath_string
xpath_long
java_method
reflect