Write SQL queries
Transform data from the CDF staging area into a data model using built-in and custom Spark SQL queries. Select Switch to SQL editor on the Transform data page to create a transformation in Spark SQL. This article describes the queries and explains how you can load data incrementally.
The SQL editor offers built-in code completion and built-in Spark SQL functions and Cognite custom SQL functions.
Your changes won't be kept if you switch from the SQL editor to the mapping editor.
Read from a CDF staging table
To select data from a CDF staging table, use the syntax mydb.mytable
:
select
*
from
database-name.table-name
If your database or table name contains special characters, enclose the name in backticks, for example `my-db`.`my table`
.
Avoid schema inference
Transformations infer schemas in the CDF staging table, but this process only uses a subset of all the rows in the table. You can avoid schema inference and write a schema fitted your data.
To avoid schema inference:
select
*
from
cdf_raw("database-name", "table-name")
This returns data with the schema key:STRING
, lastUpdatedTime:TIMESTAMP
, columns:STRING
, where the columns
string contains the JSON value encoded as a string.
Here's an example of how to enforce a user-defined schema:
select
get_json_object(columns, '$.externalId') AS externalId,
timestamp(get_json_object(columns, '$.timestamp')) AS timestamp,
double(get_json_object(columns, '$.value')) AS value
from
cdf_raw("database-name", "table-name")
Read from a CDF file
To read data from a file uploaded to the CDF Files API, use the syntax below:
select * from cdf_file_content("file-external-id")
The file must fulfill the following requirements:
- Format must be
JSON Lines
(also calledNDJSON
). - Size must be below 5GB.
- File must be
utf-8
encoded.
Duplicate rows in the file are removed when processed by Transformations.
Avoid schema inference
To avoid schema inference, use the optional schema-inference
parameter (set to true
by default):
select * from cdf_file_content("file-external-id", false)
The query returns the data with the schema value: STRING
, where the value
string contains the JSON value encoded as a string.
For example, use the user-defined schema below:
select
get_json_object(value, '$.externalId') AS externalId,
timestamp(get_json_object(value, '$.timestamp')) AS timestamp,
double(get_json_object(value, '$.value')) AS value
from
cdf_file_content("file-external-id", false)
Read from other CDF resource types
To select other CDF resource types, use the syntax _cdf.resource_type
.
select * from _cdf.events
The supported resource types are:
_cdf.events
_cdf.assets
_cdf.files
_cdf.timeseries
_cdf.sequences
_cdf_sequences.<sequence_externalId>
_cdf.datapoints
_cdf.stringdatapoints
_cdf.labels
_cdf.relationships
Load data incrementally
When reading data, your transformation jobs will run much faster and more efficiently if it only has to read the data that has changed since the last time the transformation job ran. This reduces the time to run a transformation, and allows for more frequent runs. One way to achieve this, is to filter on the lastUpdatedTime
column to query for the rows that have changed after a specific timestamp. The filter on lastUpdatedTime
is pushed to the underlying resource type (if supported) to run the query more efficiently and only return changes.
There are some minor syntax differences between some of the resource types for this filtering, but for example, when reading from staging tables it could look like this:
select * from mydb.mytable where lastUpdatedTime > to_timestamp(123456)
.
Instead of encoding the timestamp in the query and keeping it up to date every time new data has been processed, you can use the is_new
function to do this for you automatically. The function returns true
when a row has changed since the last time the transformation was run and false
otherwise. This filters out older rows before the results are processed.
The first time you run a transformation using the query below, all the rows of mytable
will be processed:
select * from mydb.mytable where is_new("mydb_mytable", lastUpdatedTime)
If the transformation completes successfully, the second run will only process rows that have changed since the first run.
If the transformation fails, is_new
filters the same rows the next time the transformation is run. This ensures that there is no data loss in the transformation from source to destination.
Incremental load is disabled when previewing query results. That is, is_new
will always return true
for all rows.
Each is_new
filter is identified by a name and can be set to any constant string (for example,"mydb_mytable"
in the above example). This allows you to differentiate between multiple calls to is_new
in the same query and use is_new
to filter on multiple tables. To easily identify the different filters, we recommend that you use the name of the table as the name of the is_new
filter. The name is stored with the transformation and must be unique for the specific transformation. If you use the same name in two different transformations, they're stored separately to not interfere with each other.
It's not common to use multiple is_new
filters in the same query. Instead, it's more likely you'll use is_new
on the main resource you're accessing. Then, you can join in different resources with data to improve any new entries from the main table or resource. If you use multiple is_new
filters, they are applied to each source separately before any join operations are evaluated. This means that for the join
to work correctly in this case, both sources have to be updated at the same time.
Resource types supporting incremental data loading on the lastUpdatedTime
column
Incremental data loading is supported by filtering on lastUpdatedTime
for the following resource types in addition to staging:
_cdf.assets
_cdf.events
_cdf.files
_cdf.timeseries
_cdf.relationships
_cdf.datasets
Incremental data loading when using Data Modeling
For data modeling, we don't recommend filtering on timestamp
or int64
columns. Instead, it is more efficient to use the variant of the is_new
function that uses the sync API to read all changes since the last time the transformation was successfully run. This is_new
function is used when it references the cdf_nodes()
, cdf_edges()
or cdf_data_models()
functions instead of a single column like lastUpdatedTime
.
This could look like this:
select * from cdf_nodes() where is_new('my_nodes')
where is_new
will filter on the output of cdf_nodes
.
Each is_new
filter is identified by a name and can be set to any constant string (for example,"my_nodes"
in the above example).
If you have multiple sources in the same query, you must specify which source the is_new
is referencing. This is done by providing an alias on the source function, like this:
select * from cdf_nodes() a, cdf_edges() b where is_new('a_nodes', a)
Here the query defines an alias for the cdf_nodes()
function, and then specifies to apply the is_new
filter on this alias. This is different than how is_new
is used for other resource types, where the specification is to a specific column in the source.
The source can be any of the cdf_nodes
, cdf_edges
or cdf_data_models
functions, and can reference a specific view, such as:
select * from cdf_data_models('space', 'datamodel_external_id', 'version', 'view') where is_new('datamodel_view')
is_new
translates the query to filter on a cursor that tracks all changes. The cursor is updated every time the transformation is successfully run, in the same way as is_new
for other resource types. You don't need to explicitly model support for this filtering in your data model, as it is inherently supported in data modeling. You can also combine this with other filters (where clauses), and it will use any matching indexes set up in data modeling to ensure performance of any optional filters.
When using is_new
with data modeling, the transformation must run at least once per three days to ensure it can find the difference between the last run and the new run. If it doesn't run for three days or more, the transformation falls back to read all of the input data.
Backfill
To process all the data even if it hasn't changed since the last transformation, change the name of the is_new
filter, for example, by adding a postfix with an incrementing number (e.g. "mydb_mytable_1"
).
This is especially useful when the logic of the query changes and data that has already been imported needs to be updated accordingly.
Write to specific properties in data modeling
In data modeling, a type node can represent anything from physical entities to abstract concepts like a comment or the type of a physical entity. Every instance (nodes and edges) in data modeling has a type property. This property is a direct relation pointing to the node that defines its intended type.
To populate the type attribute for instances, use the _type keyword in your transformation SQL statement.
The example below uses the _type
column to read, write, and filter instances.
select
cast(`externalId` as STRING) as externalId,
node_reference("typeSpace", "newTypeNodeExtId") as _type,
_type as previousType,
cast(`name` as STRING) as name
from
cdf_data_models("spaceExtId", "model", "1", "Facility")
where
_type = node_reference("typeSpace", "oldTypeNodeExtId")
The _type
is a property of the instance and isn't associated with any view. You can name a view property "type", and it can be referenced using the type
keyword.
For more information, see Type nodes in data modeling.
Custom SQL functions
In addition to the built-in Spark SQL functions, we also provide a set of custom SQL functions to help you write efficient transformations.
When a function expects var_args
, it allows a variable number of arguments of any type, including star *
.