Prerequisites
- Configure access control for your user or service account.
- Development environment with either:
- Python with
cognite-sdkinstalled, or - Node.js/TypeScript with
@cognite/sdkinstalled
- Python with
Example: configure capabilities
Example: configure capabilities
Build your first Records pipeline
In this section, you’ll build a complete data pipeline using a SCADA alarm management scenario as an example. This pipeline demonstrates the full Records lifecycle: using a mutable stream as a temporary staging area for active data, ingesting and updating records, querying with different endpoints, and archiving finalized records to an immutable stream for permanent storage. This pattern mirrors what you’d deploy in production for managing any high-volume event data where records have a defined lifecycle.Create a space and define the alarm schema
- Create a space to serve as a namespace for your SCADA alarm data. This keeps your alarm schemas and records organized and separate from other data in your CDF project.
Example: create a space
Example: create a space
- Create and define the container that specifies what properties each alarm record will have. This schema includes the fields you’d typically see in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.
usedFor to "record". Records can only be ingested into containers specifically designated for records.Example: create SCADA alarm container
Example: create SCADA alarm container
Create streams for different lifecycles
- Create a mutable stream for active alarms that need real-time updates, like when an operator acknowledges an alarm. This stream uses the
BasicLiveDatatemplate, which is optimized for data that changes frequently.
Example: create a mutable stream
Example: create a mutable stream
- Create an immutable stream for archived alarms that won’t change after they’re written. This stream uses the
ImmutableTestStreamtemplate, which is designed for write-once, read-many scenarios like long-term archival.
Example: create an immutable stream
Example: create an immutable stream
Ingest records
space.externalId to identify which schema the data follows.- Start by ingesting a couple of hardcoded alarm records to verify your setup works. This gives you a small dataset to test queries and updates before adding more data.
Example: ingest initial alarm records
Example: ingest initial alarm records
- Add 100 random alarm records to simulate a realistic dataset. This larger dataset will make your queries and aggregations more meaningful and help you see how Records handles volume.
Example: ingest random alarm records for larger dataset
Example: ingest random alarm records for larger dataset
Update records (mutable streams only)
upsert endpoint to update existing records by their space and externalId.To update a record, you need its externalId. In practice, you’d typically get this from a previous query or store it when you first ingest the record. For this example, you’ll update one of the alarm records you ingested earlier to mark it as acknowledged.Example: update an alarm to acknowledged status
Example: update an alarm to acknowledged status
Query records
- For interactive queries, use the
filterendpoint. - For large datasets with pagination, use the
syncendpoint. - For statistical analysis, use the
aggregateendpoint.
lastUpdatedTime filter is mandatory for filter and aggregate queries on immutable streams. This filter defines the time range for retrieving records:gt(greater than): The start of the time range (required for immutable streams)lt(less than): The end of the time range (optional, defaults to current time if not specified)
maxFilteringInterval setting, which is determined by the stream template you chose when the stream was created. This limit applies to the span between gt and lt, not to how far back in time you can reach, as you can query any historical period as long as each request stays within the interval.To query data spanning more than this interval, split your requests into adjacent time windows. See Query time range limits for details and examples.For mutable streams, lastUpdatedTime is optional, but using it improves query performance.-
Use the
filterendpoint for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set. It returns an unpaginated result set with a maximum of 1,000 records per request and supports custom sorting on any property. This endpoint is ideal for queries like “show me the top 10 unacknowledged alarms sorted by priority.”
Example: filter unacknowledged alarms by priority
Example: filter unacknowledged alarms by priority
-
Use the
syncendpoint for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently. It returns a paginated result set—you iterate through the data by making subsequent requests with the cursor returned in the previous response. The sync process is complete when the API response hashasNextset tofalse. The sync endpoint supports a maximum of 1,000 records per page and doesn’t support custom sorting. The read throughput is governed by the stream settings defined by the stream template you chose when creating the stream. This endpoint is designed for workflows like “export all recent alarm events for analysis” or for implementing incremental data pipelines.
Example: sync recent alarm records
Example: sync recent alarm records
-
Use the
aggregateendpoint to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations. It returns aggregated results based on the specified aggregation functions and groupings, optimized for analytical queries that require summarizing large datasets without retrieving individual records. This endpoint is ideal for generating reports like “count of alarms by severity” or “average alarm value and priority statistics.”
Example: overall alarm statistics
Example: overall alarm statistics
Example: per-alarm statistics with nested aggregations
Example: per-alarm statistics with nested aggregations
Archive acknowledged alarms (stream-to-stream pattern)
- Hot/cold storage: keep active data in mutable streams, archive historical data in immutable streams
- Data lifecycle management: move processed records to long-term storage automatically
- Performance optimization: keep active streams lean by moving old data to archives
sync endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails or is interrupted.Example: move acknowledged alarms with pagination
Example: move acknowledged alarms with pagination
Example: verify final distribution across streams
Example: verify final distribution across streams
Advanced: Build a stream-to-stream transformation pipeline
A powerful architectural pattern for Records is creating data pipelines that move and transform data between streams. This separates raw data ingestion from creating clean, standardized datasets for consumption. You’ll build a three-stage pipeline: ingest raw data to a source stream, transform it using a Hosted REST Extractor, and consume the standardized data from a destination stream.DataStaging stream template is planned for a future release. Until then, you can use the ImmutableTestStream template to test and implement this pattern.- Source: Connects to the CDF API to read from the source stream’s
/syncendpoint. - Destination: Provides credentials to write transformed records back to CDF.
- Mapping: Transforms records from the source stream format to the destination stream format.
- Job: Ties everything together and runs on a schedule with pagination and incremental load handling.
Create the source
<your-cdf-cluster>, <your-client-id>, <your-client-secret>, <your-token-url>, and <your-scopes> with your actual values.streamrecords:READ capability scoped to the source stream’s space.Create the destination
streamrecords:WRITE capability scoped to the destination stream’s space.Create the mapping
/sync endpoint returns a response containing an items array. The mapping iterates over these items and transforms each one.For writing to Records streams, the output must include:type: Set to"immutable_record"for immutable streams.space: The space where the destination stream exists.stream: The external ID of the destination stream.externalId: A unique identifier for each record.sources: An array of container sources with their properties.beta: Set totrueas the records destination is currently in beta.
Create the job
/sync endpoint on a schedule, applies the mapping, and writes to the destination. Configure incremental load to initialize the cursor on the first run, and pagination to handle cursor-based iteration through records.| Parameter | Description |
|---|---|
| interval | How often the job runs (for example, 1h, 15m, 1d). |
| path | The /sync endpoint path for the source stream. |
| body | The base request body with limit and filter parameters. |
| incremental_load | Sets initializeCursor on the first request to start syncing from a specific point (for example, 30d-ago). |
| pagination | Uses nextCursor from the response to paginate through all records. Returns null to stop when no more records exist. |
incremental_load expression merges the base body with the initializeCursor parameter. The pagination expression checks if nextCursor exists in the response and either continues with the cursor or returns null to stop pagination.