Learn Records core concepts and patterns through a hands-on tutorial. Follow along with a SCADA alarm management example that demonstrates schemas, streams, ingestion, querying, and stream-to-stream pipelines.
This hands-on guide teaches you the core patterns for working with Records in Cognite Data Fusion (CDF). You’ll learn how to create schemas, set up streams, ingest high-volume event data, query records efficiently, and build data pipelines between streams.Throughout this guide, SCADA alarm management serves as a practical example, but these patterns apply to any high-volume industrial data: multi-value sensor readings, event frames, equipment logs, telemetry events, and more. The Records service is built specifically for ingesting billions of events and querying them efficiently.Records are designed primarily for immutable data like logs, events, and historical records that don’t change after ingestion. If your data needs updates during its lifecycle (like active alarms awaiting acknowledgment), you can use a mutable stream as a transitional stage, and then archive the finalized records to an immutable stream. For data that requires frequent updates and will never be archived, consider using data modeling nodes instead.By the end of this guide, you’ll have a working example that handles the full lifecycle: ingesting data, updating records during their active phase, querying efficiently, and archiving completed records to immutable storage.If you’re new to Records concepts like spaces, containers, and streams, see Records and streams for background.
In this section, you’ll build a complete data pipeline using a SCADA alarm management scenario as an example. This pipeline demonstrates the full Records lifecycle: using a mutable stream as a temporary staging area for active data, ingesting and updating records, querying with different endpoints, and archiving finalized records to an immutable stream for permanent storage.This pattern mirrors what you’d deploy in production for managing any high-volume event data where records have a defined lifecycle.
1
Create a space and define the alarm schema
Start by defining the schema for your alarm records. You’ll create a space to organize your data and a container that defines the structure of each alarm event. If you’re new to these concepts, check out the data modeling concepts for background.
Create a space to serve as a namespace for your SCADA alarm data. This keeps your alarm schemas and records organized and separate from other data in your CDF project.
Create and define the container that specifies what properties each alarm record will have. This schema includes the fields you’d typically see in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.
When creating containers for records, you must set usedFor to "record". Records can only be ingested into containers specifically designated for records.
Example: create SCADA alarm container
Report incorrect code
Copy
Ask AI
from cognite.client import CogniteClientfrom cognite.client.data_classes.data_modeling import ( ContainerApply, ContainerProperty, Text, Float64, Int64, Boolean, Timestamp,)client = CogniteClient()container_id = "alarm_events"container = client.data_modeling.containers.apply( ContainerApply( space="scada-alarms", external_id=container_id, name="SCADA Alarm Events", description="Container for SCADA alarm event data", used_for="record", properties={ # Text property - alarm identifier "alarm_id": ContainerProperty( type=Text(is_list=False), nullable=False, name="Alarm ID", description="Unique identifier for the alarm", ), # Text property - alarm severity "severity": ContainerProperty( type=Text(is_list=False), nullable=False, name="Severity", description="Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)", ), # Float property - alarm value "value": ContainerProperty( type=Float64(is_list=False), nullable=False, name="Value", description="Numeric value associated with the alarm condition", ), # Int property - priority score "priority": ContainerProperty( type=Int64(is_list=False), nullable=False, name="Priority", description="Priority score (1-100)", ), # Boolean property - acknowledged status "is_acknowledged": ContainerProperty( type=Boolean(is_list=False), nullable=False, name="Acknowledged Status", description="True if alarm has been acknowledged", ), # Timestamp property - when alarm occurred "timestamp": ContainerProperty( type=Timestamp(is_list=False), nullable=False, name="Timestamp", description="Time when the alarm occurred", ), }, ))
2
Create streams for different lifecycles
Now create the streams that will hold your alarm records. Streams need a stream settings template that defines their behavior, performance characteristics, and lifecycle policies. For this pipeline, you’ll create two streams to handle different stages of the alarm lifecycle.Choose your stream template based on data lifecycle and throughput requirements, not by record type or source system. For detailed specifications of each template’s limits, see the Streams API documentation.
Create a mutable stream for active alarms that need real-time updates, like when an operator acknowledges an alarm. This stream uses the BasicLiveData template, which is optimized for data that changes frequently.
Create an immutable stream for archived alarms that won’t change after they’re written. This stream uses the ImmutableTestStream template, which is designed for write-once, read-many scenarios like long-term archival.
Now you’ll ingest alarm records into your mutable stream. In production, you’d typically use a Cognite extractor that reads directly from your source system and writes to Records. For this tutorial, you’ll use the API directly to simulate the ingestion process.
The Records service does not support onboarding data using RAW and Transformations. Always ingest data directly to a Records stream using one of the supported extractors:
When writing records to the API, you need to specify the target stream identifier and include the container’s space.externalId to identify which schema the data follows.
Start by ingesting a couple of hardcoded alarm records to verify your setup works. This gives you a small dataset to test queries and updates before adding more data.
Add 100 random alarm records to simulate a realistic dataset. This larger dataset will make your queries and aggregations more meaningful and help you see how Records handles volume.
Example: ingest random alarm records for larger dataset
Report incorrect code
Copy
Ask AI
from cognite.client import CogniteClientfrom datetime import datetime, timedeltaimport randomimport uuidclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"# Generate 100 random alarm recordsalarm_ids = [ "ALARM-PRESS-001", "ALARM-TEMP-042", "ALARM-FLOW-013", "ALARM-VIBR-088", "ALARM-LEVEL-055",]severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]base_time = datetime.now() - timedelta(hours=2)random_records = [ { "alarm_id": random.choice(alarm_ids), "severity": random.choice(severities), "value": round(random.uniform(50.0, 150.0), 2), "priority": random.randint(20, 100), "is_acknowledged": random.choice([True, False]), "timestamp": (base_time + timedelta(minutes=i)).strftime("%Y-%m-%dT%H:%M:%S.000Z"), } for i in range(100)]# Prepare random records for ingestionitems = []for record in random_records: items.append( { "space": space_id, "externalId": str(uuid.uuid4()), "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": record, } ], } )# Ingest random recordsresponse = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records", json={"items": items},)
4
Update records (mutable streams only)
One advantage of mutable streams is the ability to update records after they’re written. This is essential for scenarios like acknowledging alarms when an operator responds, updating status fields as conditions change, or correcting data errors. Use the upsert endpoint to update existing records by their space and externalId.To update a record, you need its externalId. In practice, you’d typically get this from a previous query or store it when you first ingest the record. For this example, you’ll update one of the alarm records you ingested earlier to mark it as acknowledged.
Example: update an alarm to acknowledged status
Report incorrect code
Copy
Ask AI
from cognite.client import CogniteClientclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"# Assuming you have the external_id of a record you want to update# In practice, you would get this from a previous query or store it during ingestionrecord_external_id = "your-record-external-id" # Replace with actual ID# Update the record to set is_acknowledged to Trueupdated_record = { "alarm_id": "ALARM-TEMP-042", "severity": "MEDIUM", "value": 88.3, "priority": 60, "is_acknowledged": True, "timestamp": "2025-10-22T10:00:00.000Z",}response = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/upsert", json={ "items": [ { "space": space_id, "externalId": record_external_id, "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": updated_record, } ], } ] },)
5
Query records
Next, you’ll query your alarm records using the Records API. The API provides three endpoints for consuming data, each optimized for different use cases:
For interactive queries, use the filter endpoint.
For large datasets with pagination, use the sync endpoint.
For statistical analysis, use the aggregate endpoint.
The query syntax for filtering is similar across all endpoints.
When querying records, we recommend that you use the hasData filter to ensure you only retrieve records that contain data in the container you’re querying. A record can exist in a stream but may not have data for every container. The hasData filter ensures efficient queries by excluding records that don’t have data in the selected container.
Time-based filtering with lastUpdatedTimeThe lastUpdatedTime filter is mandatory for filter and aggregate queries on immutable streams. This filter defines the time range for retrieving records:
gt (greater than): The start of the time range (required for immutable streams)
lt (less than): The end of the time range (optional, defaults to current time if not specified)
The maximum time range you can query is defined by the stream’s maxFilteringInterval setting, which is determined by the stream template you chose when the stream was created. You can check the limit by retrieving the stream’s settings at /api/v1/projects/{project}/streams/{streamId} and inspecting settings.limits.maxFilteringInterval.For mutable streams, lastUpdatedTime is optional, but we recommend that you use it for improved query performance.
Use the filter endpoint for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set. It returns an unpaginated result set with a maximum of 1,000 records per request and supports custom sorting on any property.This endpoint is ideal for queries like “show me the top 10 unacknowledged alarms sorted by priority.”
Use the sync endpoint for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently. It returns a paginated result set—you iterate through the data by making subsequent requests with the cursor returned in the previous response. The sync process is complete when the API response has hasNext set to false.The sync endpoint supports a maximum of 1,000 records per page and doesn’t support custom sorting. The read throughput is governed by the stream settings defined by the stream template you chose when creating the stream.This endpoint is designed for workflows like “export all recent alarm events for analysis” or for implementing incremental data pipelines.
Example: sync recent alarm records
Report incorrect code
Copy
Ask AI
from cognite.client import CogniteClientfrom tabulate import tabulateclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"# Sync records from the last 2 daysresponse = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync", json={ "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": ["*"], } ], "filter": { "hasData": [ { "type": "container", "space": space_id, "externalId": container_id, } ] }, "initializeCursor": "2d-ago", "limit": 10, },)results = response.json()items = results.get("items", [])# Display as tabletable_data = []for item in items[:5]: # Show first 5 props = item["properties"][space_id][container_id] ack_status = "ACK" if props["is_acknowledged"] else "UNACK" table_data.append([ props["alarm_id"], props["severity"], ack_status, props["value"], props["timestamp"], ])print(f"Sync returned {len(items)} records (showing first 5):")print(tabulate( table_data, headers=["Alarm ID", "Severity", "Status", "Value", "Timestamp"], tablefmt="github",))if "nextCursor" in results: cursor = results["nextCursor"] cursor_preview = f"{cursor[:5]}..." if len(cursor) > 5 else cursor print(f"\nNext cursor for incremental sync: {cursor_preview}")
Expected output:
Report incorrect code
Copy
Ask AI
Sync returned 10 records (showing first 5):| Alarm ID | Severity | Status | Value | Timestamp ||-----------------|------------|----------|---------|--------------------------|| ALARM-PRESS-001 | CRITICAL | UNACK | 125.8 | 2025-10-22T10:00:00.000Z || ALARM-TEMP-042 | LOW | UNACK | 98.78 | 2025-10-24T11:27:54.000Z || ALARM-VIBR-088 | CRITICAL | ACK | 122.78 | 2025-10-24T11:28:54.000Z || ALARM-VIBR-088 | MEDIUM | ACK | 140.56 | 2025-10-24T11:29:54.000Z || ALARM-LEVEL-055 | CRITICAL | ACK | 50.59 | 2025-10-24T11:30:54.000Z |Next cursor for incremental sync: 08c0e...
Use the aggregate endpoint to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations. It returns aggregated results based on the specified aggregation functions and groupings, optimized for analytical queries that require summarizing large datasets without retrieving individual records.This endpoint is ideal for generating reports like “count of alarms by severity” or “average alarm value and priority statistics.”
The final step of this tutorial is to implement a common Records pattern: moving data between streams based on lifecycle stages. You’ll archive acknowledged alarms by moving them from the mutable stream to the immutable stream. This stream-to-stream pattern is useful for:
Hot/cold storage: keep active data in mutable streams, archive historical data in immutable streams
Data lifecycle management: move processed records to long-term storage automatically
Performance optimization: keep active streams lean by moving old data to archives
The key to reliable stream-to-stream operations is using the sync endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails or is interrupted.
Example: move acknowledged alarms with pagination
Report incorrect code
Copy
Ask AI
from cognite.client import CogniteClientclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"immutable_stream_id = "alarm_data_archive_stream"# In a production system, you would persist this cursor between runsstored_cursor = None# Pagination settingsBATCH_SIZE = 10total_records_moved = 0batch_number = 0print("Starting pagination loop to archive acknowledged alarms...")# Keep paginating until there are no more acknowledged recordswhile True: batch_number += 1 print(f"\n--- Batch {batch_number} ---") # Sync acknowledged records from MUTABLE stream sync_response = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync", json={ "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": ["*"], } ], # Use stored cursor if available, otherwise initialize from 2 days ago **( {"cursor": stored_cursor} if stored_cursor else {"initializeCursor": "2d-ago"} ), "filter": { "and": [ { "hasData": [ { "type": "container", "space": space_id, "externalId": container_id, } ] }, { "equals": { "property": [space_id, container_id, "is_acknowledged"], "value": True, } }, ] }, "limit": BATCH_SIZE, }, ) sync_results = sync_response.json() acknowledged_records = sync_results.get("items", []) next_cursor = sync_results["nextCursor"] has_next = sync_results["hasNext"] print(f"Found {len(acknowledged_records)} acknowledged records in this batch") print(f"More data immediately available: {has_next}") # If no records found, we're done if len(acknowledged_records) == 0: print("No more acknowledged records to move") stored_cursor = next_cursor # Update cursor even when no records found break # Prepare records for ingestion into immutable stream print(f"Ingesting {len(acknowledged_records)} records to IMMUTABLE stream...") immutable_items = [] records_to_delete = [] for record in acknowledged_records: # Extract properties and identifiers record_space = record["space"] record_external_id = record["externalId"] props = record["properties"][space_id][container_id] # Store record identifier for deletion records_to_delete.append({ "space": record_space, "externalId": record_external_id, }) # Prepare for immutable stream immutable_items.append({ "space": record_space, "externalId": record_external_id, "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": props, } ], }) # Ingest into immutable stream immutable_res = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records", json={"items": immutable_items}, ) # Check if ingest was successful (200/201: sync success, 202: async processing) if immutable_res.status_code not in [200, 201, 202]: print(f"Warning: Immutable ingest returned status {immutable_res.status_code}") print(f"Response: {immutable_res.text}") break else: print(f"✓ Ingested {len(immutable_items)} records to IMMUTABLE stream") # Delete acknowledged records from MUTABLE stream print(f"Deleting {len(records_to_delete)} records from MUTABLE stream...") delete_res = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/delete", json={"items": records_to_delete}, ) print(f"✓ Deleted {len(records_to_delete)} records from MUTABLE stream") # IMPORTANT: Only update cursor after BOTH operations succeed stored_cursor = next_cursor total_records_moved += len(acknowledged_records) cursor_preview = f"{stored_cursor[:20]}..." if len(stored_cursor) > 20 else stored_cursor print(f"✓ Cursor updated: {cursor_preview}") print(f"Total records moved so far: {total_records_moved}")print(f"\n{'=' * 70}")print("PAGINATION COMPLETE")print(f"{'=' * 70}")print(f"Total batches processed: {batch_number}")print(f"Total records moved: {total_records_moved}")print(f"{'=' * 70}")
Expected output:
Report incorrect code
Copy
Ask AI
Starting pagination loop to archive acknowledged alarms...--- Batch 1 ---Found 10 acknowledged records in this batchMore data immediately available: TrueIngesting 10 records to IMMUTABLE stream...✓ Ingested 10 records to IMMUTABLE streamDeleting 10 records from MUTABLE stream...✓ Deleted 10 records from MUTABLE stream✓ Cursor updated: 08c0b89cecbeb1dab818...Total records moved so far: 10--- Batch 2 ---Found 10 acknowledged records in this batchMore data immediately available: TrueIngesting 10 records to IMMUTABLE stream...✓ Ingested 10 records to IMMUTABLE streamDeleting 10 records from MUTABLE stream...✓ Deleted 10 records from MUTABLE stream✓ Cursor updated: 08c0aae0c3c0b1dab818...Total records moved so far: 20--- Batch 3 ---Found 10 acknowledged records in this batchMore data immediately available: TrueIngesting 10 records to IMMUTABLE stream...✓ Ingested 10 records to IMMUTABLE streamDeleting 10 records from MUTABLE stream...✓ Deleted 10 records from MUTABLE stream✓ Cursor updated: 08c0e5b4cdc4b1dab818...Total records moved so far: 30[... batches 4-6 similar, each moving 10 records ...]--- Batch 7 ---Found 4 acknowledged records in this batchMore data immediately available: FalseIngesting 4 records to IMMUTABLE stream...✓ Ingested 4 records to IMMUTABLE streamDeleting 4 records from MUTABLE stream...✓ Deleted 4 records from MUTABLE stream✓ Cursor updated: 0880beeae1c9b1dab818...Total records moved so far: 64--- Batch 8 ---Found 0 acknowledged records in this batchMore data immediately available: FalseNo more acknowledged records to move======================================================================PAGINATION COMPLETE======================================================================Total batches processed: 8Total records moved: 64======================================================================
Example: verify final distribution across streams
After archiving, verify that the records were moved correctly by querying both streams to see the final distribution of acknowledged and unacknowledged alarms.
MUTABLE Stream Statistics: Total Records: 38 Unacknowledged (False): 38 recordsIMMUTABLE Stream Statistics: Total Records: 64 Acknowledged (True): 64 records
This confirms that all 64 acknowledged alarms have been successfully moved from the mutable stream (which now contains only the remaining 38 unacknowledged alarms) to the immutable archive stream. Together, this accounts for the original 102 total records (38 + 64 = 102).
You’ve successfully built a complete SCADA alarm management pipeline using Records! You’ve created schemas, set up mutable and immutable streams, ingested alarm data, updated records, queried them using multiple endpoints, and implemented a stream-to-stream archiving pattern. This pipeline demonstrates the full lifecycle of managing high-volume OT event data with Records.
Advanced: Build a stream-to-stream transformation pipeline
A powerful architectural pattern for Records is creating data pipelines that move and transform data between streams. This separates raw data ingestion from creating clean, standardized datasets for consumption. You’ll build a three-stage pipeline: ingest raw data to a source stream, transform it using a Hosted REST Extractor, and consume the standardized data from a destination stream.
The DataStaging stream template is planned for a future release. Until then, you can use the ImmutableTestStream template to test and implement this pattern.
To build a stream-to-stream transformation pipeline, you need to configure these components:
Source: Connects to the CDF API to read from the source stream’s /sync endpoint.
Destination: Provides credentials to write transformed records back to CDF.
Mapping: Transforms records from the source stream format to the destination stream format.
Job: Ties everything together and runs on a schedule with pagination and incremental load handling.
1
Create the source
The source defines how the REST extractor connects to the CDF API to read records from your source stream. Since you’re reading from CDF, use your CDF cluster as the host and configure OAuth client credentials authentication.
Replace <your-cdf-cluster>, <your-client-id>, <your-client-secret>, <your-token-url>, and <your-scopes> with your actual values.
The source credentials must have streamrecords:READ capability scoped to the source stream’s space.
2
Create the destination
The destination provides credentials for the extractor to write transformed records to CDF. Create a session and use its nonce to authenticate the destination.
Report incorrect code
Copy
Ask AI
from cognite.client.data_classes.hosted_extractors import DestinationWrite, SessionWritefrom cognite.client.data_classes.iam import ClientCredentials# Create a session for the destinationsession = client.iam.sessions.create( client_credentials=ClientCredentials( client_id="<your-client-id>", client_secret="<your-client-secret>", ), session_type="CLIENT_CREDENTIALS")# Create the destination using the session noncedestination = client.hosted_extractors.destinations.create( DestinationWrite( external_id="my-records-destination", credentials=SessionWrite(session.nonce) ))
The destination credentials must have streamrecords:WRITE capability scoped to the destination stream’s space.
3
Create the mapping
The mapping defines how to transform each record from the source stream into the output format for the destination stream. The /sync endpoint returns a response containing an items array. The mapping iterates over these items and transforms each one.For writing to Records streams, the output must include:
type: Set to "immutable_record" for immutable streams.
space: The space where the destination stream exists.
stream: The external ID of the destination stream.
externalId: A unique identifier for each record.
sources: An array of container sources with their properties.
beta: Set to true as records destination is currently in beta.
See custom mappings for the full mapping language reference.
4
Create the job
The job ties everything together. It reads from the source stream’s /sync endpoint on a schedule, applies the mapping, and writes to the destination. Configure incremental load to initialize the cursor on the first run, and pagination to handle cursor-based iteration through records.
How often the job runs (for example, 1h, 15m, 1d).
path
The /sync endpoint path for the source stream.
body
The base request body with limit and filter parameters.
incremental_load
Sets initializeCursor on the first request to start syncing from a specific point (for example, 30d-ago).
pagination
Uses nextCursor from the response to paginate through all records. Returns null to stop when no more records exist.
The incremental_load expression merges the base body with the initializeCursor parameter. The pagination expression checks if nextCursor exists in the response and either continues with the cursor or returns null to stop pagination.
5
Monitor the job
After creating the job, you can monitor its status, logs, and metrics.
Expected outcome: The job runs on the configured interval, reads records from the source stream, transforms them using your mapping, and writes them to the destination stream.
This composable architecture lets you ingest data once in its raw format, then create streams containing standardized, use-case-specific record schemas from the source stream. This builds a robust and maintainable data processing ecosystem within CDF for your high-volume OT event and log data.