Start with incremental filters
When reading data, your transformation jobs run much faster and more efficiently if they only process data that has changed since the last run. This reduces transformation time and allows for more frequent runs.Use is_new() for automatic incremental filtering
Instead of manually managing timestamps in your queries, use theis_new() function to automatically filter for changed data. The function returns true when a row has changed since the last successful transformation run and false otherwise.
- First run: All rows are processed.
- Successful subsequent runs: Only rows that changed since the last successful run are processed.
- Failed runs: The same rows are reprocessed to prevent data loss.
Incremental load is disabled when previewing query results. That is,
is_new always returns true for all rows during preview.Naming is_new filters
Eachis_new filter is identified by a unique name (for example, "mydb_mytable" in the example above). The name is stored with the transformation and must be unique within that transformation. Use descriptive names based on the table or source to easily identify different filters.
If you use the same name in different transformations, they’re tracked separately and don’t interfere with each other.
It’s not common to use multiple
is_new filters in the same query. Instead, use is_new on the main resource you’re accessing, then join in other resources to enrich the new entries. If you do use multiple is_new filters, they apply to each source separately before any join operations, meaning both sources must be updated at the same time for joins to work correctly.Resource types supporting incremental loading
Incremental data loading withlastUpdatedTime filtering is supported for:
- RAW tables (
mydb.mytable) _cdf.assets_cdf.events_cdf.files_cdf.timeseries_cdf.relationships_cdf.datasets
Incremental loading with data modeling
For data modeling sources, use theis_new() variant with cdf_nodes(), cdf_edges(), or cdf_data_models() to rely on sync cursors instead of timestamp filters. This approach is more efficient than filtering on timestamp or int64 columns.
is_new references using an alias:
Backfilling data
To reprocess all data even if it hasn’t changed, change the name of theis_new filter (for example, add an incrementing suffix like "mydb_mytable_2"). This is useful when query logic changes and previously imported data needs updating.
Avoid double reads from the same table
If multiple sub-queries read from the same source table, combine them into a single query instead of usingUNION ALL. This avoids scanning the same data twice in Spark and reduces executor load.
Prefer LEFT JOIN for small lookup tables
If you join a RAW table with a very small table or a single-row table and there is noON clause, use LEFT JOIN or inline the constant values. A join without a condition becomes a cartesian join, which can lead to inefficient plans. Switching to LEFT JOIN can significantly improve the query plan and reduce runtime.
Filter early at the source
Reduce data volume before Spark reads it. RAW is a key-value store, and filtering happens on the client side in transformations. If you only need a subset of columns from a very wide RAW table, consider:- Splitting the source upstream into smaller, purpose-built tables.
- Creating a staging table that includes only the columns used by transformations.
Use CTEs for clarity and reuse
Use CTEs (common table expressions) for temporary tables and shared sub-logic. CTEs make complex queries easier to read and maintain, and they can simplify legacy transformations where logic has grown over time. For data modeling sources, keep the logic focused on graph queries and avoid deep CTE chains when they add little value.Keep joins inside the data modeling framework
When you are writing to data modeling, keep complex joins inside the data modeling framework. Filters and indexes are optimized for graph queries, and you can combine joins withis_new() on the primary source to enrich new entries efficiently.
Manage schema inference explicitly
CDF infers schema from the first 10,000 rows it reads. If those rows are sparse or inconsistent, the inferred schema can be wrong. Workarounds:- Insert one or more “schema rows” with all expected columns and sort them to the top of the input.
- Use
get_json_objectto extract fields from JSON payloads when schema is uneven or evolving.