Build a SCADA alarm management pipeline with Records
Learn how to ingest, transform, and query high-volume OT event data using the Records service with this comprehensive guide.
If you’re working with high-volume operational technology (OT) event data—think SCADA alarms, sensor readings, or equipment logs—you’ve likely hit the limits of traditional data pipelines. The Records service in Cognite Data Fusion (CDF) is built specifically for this: ingesting millions of events per second, updating records in real time, and querying them efficiently.In this tutorial, you’ll build a complete SCADA alarm management pipeline that demonstrates the full Records workflow. You’ll create schemas for alarm events, set up mutable and immutable streams for different data lifecycles, ingest and update records, query them using multiple endpoints, and implement a stream-to-stream archiving pattern. By the end, you’ll have a working example that handles live alarm updates, real-time queries, and automated archival.If you’re new to Records concepts like spaces, containers, and streams, check out Records and streams for background.
You’ll build a complete SCADA alarm management system that handles the full lifecycle of alarm events. This pipeline will create a mutable stream for live alarms that need real-time updates, ingest alarm data from your source systems, query records using different endpoints for various use cases, and automatically archive acknowledged alarms to an immutable stream for long-term storage.This pattern mirrors what you’d deploy in production for managing high-volume industrial alarm data.
1
Create a space and define the alarm schema
Start by defining the schema for your alarm records. You’ll create a space to organize your data and a container that defines the structure of each alarm event. If you’re new to these concepts, check out the data modeling concepts for background.
Create a space to serve as a namespace for your SCADA alarm data. This keeps your alarm schemas and records organized and separate from other data in your CDF project.
Create and define the container that specifies what properties each alarm record will have. This schema includes the fields you’d typically see in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.
When creating containers for records, you must set usedFor to "record". Records can only be ingested into containers specifically designated for records.
Example: create SCADA alarm container
Report incorrect code
Copy
from cognite.client import CogniteClientfrom cognite.client.data_classes.data_modeling import ( ContainerApply, ContainerProperty, Text, Float64, Int64, Boolean, Timestamp,)client = CogniteClient()container_id = "alarm_events"container = client.data_modeling.containers.apply( ContainerApply( space="scada-alarms", external_id=container_id, name="SCADA Alarm Events", description="Container for SCADA alarm event data", used_for="record", properties={ # Text property - alarm identifier "alarm_id": ContainerProperty( type=Text(is_list=False), nullable=False, name="Alarm ID", description="Unique identifier for the alarm", ), # Text property - alarm severity "severity": ContainerProperty( type=Text(is_list=False), nullable=False, name="Severity", description="Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)", ), # Float property - alarm value "value": ContainerProperty( type=Float64(is_list=False), nullable=False, name="Value", description="Numeric value associated with the alarm condition", ), # Int property - priority score "priority": ContainerProperty( type=Int64(is_list=False), nullable=False, name="Priority", description="Priority score (1-100)", ), # Boolean property - acknowledged status "is_acknowledged": ContainerProperty( type=Boolean(is_list=False), nullable=False, name="Acknowledged Status", description="True if alarm has been acknowledged", ), # Timestamp property - when alarm occurred "timestamp": ContainerProperty( type=Timestamp(is_list=False), nullable=False, name="Timestamp", description="Time when the alarm occurred", ), }, ))
2
Create streams for different lifecycles
Now create the streams that will hold your alarm records. Streams need a stream settings template that defines their behavior, performance characteristics, and lifecycle policies. For this pipeline, you’ll create two streams to handle different stages of the alarm lifecycle.Choose your stream template based on data lifecycle and throughput requirements, not by record type or source system. For detailed specifications of each template’s limits, see the Streams API documentation.
Create a mutable stream for active alarms that need real-time updates, like when an operator acknowledges an alarm. This stream uses the BasicLiveData template, which is optimized for data that changes frequently.
Create an immutable stream for archived alarms that won’t change after they’re written. This stream uses the ImmutableTestStream template, which is designed for write-once, read-many scenarios like long-term archival.
Now you’ll ingest alarm records into your mutable stream. In production, you’d typically use a Cognite extractor that reads directly from your source system and writes to Records. For this tutorial, you’ll use the API directly to simulate the ingestion process.
The Records service does not support onboarding data using RAW and Transformations. Always ingest data directly to a Records stream using one of the supported extractors:
When writing records to the API, you need to specify the target stream identifier and include the container’s space.externalId to identify which schema the data follows.
Start by ingesting a couple of hardcoded alarm records to verify your setup works. This gives you a small dataset to test queries and updates before adding more data.
Add 100 random alarm records to simulate a realistic dataset. This larger dataset will make your queries and aggregations more meaningful and help you see how Records handles volume.
Example: ingest random alarm records for larger dataset
Report incorrect code
Copy
from cognite.client import CogniteClientfrom datetime import datetime, timedeltaimport randomimport uuidclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"# Generate 100 random alarm recordsalarm_ids = [ "ALARM-PRESS-001", "ALARM-TEMP-042", "ALARM-FLOW-013", "ALARM-VIBR-088", "ALARM-LEVEL-055",]severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]base_time = datetime.now() - timedelta(hours=2)random_records = [ { "alarm_id": random.choice(alarm_ids), "severity": random.choice(severities), "value": round(random.uniform(50.0, 150.0), 2), "priority": random.randint(20, 100), "is_acknowledged": random.choice([True, False]), "timestamp": (base_time + timedelta(minutes=i)).strftime("%Y-%m-%dT%H:%M:%S.000Z"), } for i in range(100)]# Prepare random records for ingestionitems = []for record in random_records: items.append( { "space": space_id, "externalId": str(uuid.uuid4()), "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": record, } ], } )# Ingest random recordsresponse = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records", json={"items": items},)
4
Update records (mutable streams only)
One advantage of mutable streams is the ability to update records after they’re written. This is essential for scenarios like acknowledging alarms when an operator responds, updating status fields as conditions change, or correcting data errors. Use the upsert endpoint to update existing records by their space and externalId.To update a record, you need its externalId. In practice, you’d typically get this from a previous query or store it when you first ingest the record. For this example, you’ll update one of the alarm records you ingested earlier to mark it as acknowledged.
Example: update an alarm to acknowledged status
Report incorrect code
Copy
from cognite.client import CogniteClientclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"# Assuming you have the external_id of a record you want to update# In practice, you would get this from a previous query or store it during ingestionrecord_external_id = "your-record-external-id" # Replace with actual ID# Update the record to set is_acknowledged to Trueupdated_record = { "alarm_id": "ALARM-TEMP-042", "severity": "MEDIUM", "value": 88.3, "priority": 60, "is_acknowledged": True, "timestamp": "2025-10-22T10:00:00.000Z",}response = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/upsert", json={ "items": [ { "space": space_id, "externalId": record_external_id, "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": updated_record, } ], } ] },)
5
Query records
Next, you’ll query your alarm records using the Records API. The API provides three endpoints for consuming data, each optimized for different use cases:
For interactive queries, use the filter endpoint.
For large datasets with pagination, use the sync endpoint.
For statistical analysis, use the aggregate endpoint.
The query syntax for filtering is similar across all endpoints.
When querying records, we recommend that you use the hasData filter to ensure you only retrieve records that contain data in the container you’re querying. A record can exist in a stream but may not have data for every container. The hasData filter ensures efficient queries by excluding records that don’t have data in the selected container.
Time-based filtering with lastUpdatedTimeThe lastUpdatedTime filter is mandatory for filter and aggregate queries on immutable streams. This filter defines the time range for retrieving records:
gt (greater than): The start of the time range (required for immutable streams)
lt (less than): The end of the time range (optional, defaults to current time if not specified)
The maximum time range you can query is defined by the stream’s maxFilteringInterval setting, which is determined by the stream template you chose when the stream was created. You can check the limit by retrieving the stream’s settings at /api/v1/projects/{project}/streams/{streamId} and inspecting settings.limits.maxFilteringInterval.For mutable streams, lastUpdatedTime is optional, but we recommend that you use it for improved query performance.
Use the filter endpoint for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set. It returns an unpaginated result set with a maximum of 1,000 records per request and supports custom sorting on any property.This endpoint is ideal for queries like “show me the top 10 unacknowledged alarms sorted by priority.”
Use the sync endpoint for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently. It returns a paginated result set—you iterate through the data by making subsequent requests with the cursor returned in the previous response. The sync process is complete when the API response has hasNext set to false.The sync endpoint supports a maximum of 1,000 records per page and doesn’t support custom sorting. The read throughput is governed by the stream settings defined by the stream template you chose when creating the stream.This endpoint is designed for workflows like “export all recent alarm events for analysis” or for implementing incremental data pipelines.
Example: sync recent alarm records
Report incorrect code
Copy
from cognite.client import CogniteClientfrom tabulate import tabulateclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"# Sync records from the last 2 daysresponse = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync", json={ "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": ["*"], } ], "filter": { "hasData": [ { "type": "container", "space": space_id, "externalId": container_id, } ] }, "initializeCursor": "2d-ago", "limit": 10, },)results = response.json()items = results.get("items", [])# Display as tabletable_data = []for item in items[:5]: # Show first 5 props = item["properties"][space_id][container_id] ack_status = "ACK" if props["is_acknowledged"] else "UNACK" table_data.append([ props["alarm_id"], props["severity"], ack_status, props["value"], props["timestamp"], ])print(f"Sync returned {len(items)} records (showing first 5):")print(tabulate( table_data, headers=["Alarm ID", "Severity", "Status", "Value", "Timestamp"], tablefmt="github",))if "nextCursor" in results: cursor = results["nextCursor"] cursor_preview = f"{cursor[:5]}..." if len(cursor) > 5 else cursor print(f"\nNext cursor for incremental sync: {cursor_preview}")
Expected output:
Report incorrect code
Copy
Sync returned 10 records (showing first 5):| Alarm ID | Severity | Status | Value | Timestamp ||-----------------|------------|----------|---------|--------------------------|| ALARM-PRESS-001 | CRITICAL | UNACK | 125.8 | 2025-10-22T10:00:00.000Z || ALARM-TEMP-042 | LOW | UNACK | 98.78 | 2025-10-24T11:27:54.000Z || ALARM-VIBR-088 | CRITICAL | ACK | 122.78 | 2025-10-24T11:28:54.000Z || ALARM-VIBR-088 | MEDIUM | ACK | 140.56 | 2025-10-24T11:29:54.000Z || ALARM-LEVEL-055 | CRITICAL | ACK | 50.59 | 2025-10-24T11:30:54.000Z |Next cursor for incremental sync: 08c0e...
Use the aggregate endpoint to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations. It returns aggregated results based on the specified aggregation functions and groupings, optimized for analytical queries that require summarizing large datasets without retrieving individual records.This endpoint is ideal for generating reports like “count of alarms by severity” or “average alarm value and priority statistics.”
The final step of this tutorial is to implement a common Records pattern: moving data between streams based on lifecycle stages. You’ll archive acknowledged alarms by moving them from the mutable stream to the immutable stream. This stream-to-stream pattern is useful for:
Hot/cold storage: keep active data in mutable streams, archive historical data in immutable streams
Data lifecycle management: move processed records to long-term storage automatically
Performance optimization: keep active streams lean by moving old data to archives
The key to reliable stream-to-stream operations is using the sync endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails or is interrupted.
Example: move acknowledged alarms with pagination
Report incorrect code
Copy
from cognite.client import CogniteClientclient = CogniteClient()space_id = "scada-alarms"container_id = "alarm_events"mutable_stream_id = "alarm_data_live_stream"immutable_stream_id = "alarm_data_archive_stream"# In a production system, you would persist this cursor between runsstored_cursor = None# Pagination settingsBATCH_SIZE = 10total_records_moved = 0batch_number = 0print("Starting pagination loop to archive acknowledged alarms...")# Keep paginating until there are no more acknowledged recordswhile True: batch_number += 1 print(f"\n--- Batch {batch_number} ---") # Sync acknowledged records from MUTABLE stream sync_response = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync", json={ "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": ["*"], } ], # Use stored cursor if available, otherwise initialize from 2 days ago **( {"cursor": stored_cursor} if stored_cursor else {"initializeCursor": "2d-ago"} ), "filter": { "and": [ { "hasData": [ { "type": "container", "space": space_id, "externalId": container_id, } ] }, { "equals": { "property": [space_id, container_id, "is_acknowledged"], "value": True, } }, ] }, "limit": BATCH_SIZE, }, ) sync_results = sync_response.json() acknowledged_records = sync_results.get("items", []) next_cursor = sync_results["nextCursor"] has_next = sync_results["hasNext"] print(f"Found {len(acknowledged_records)} acknowledged records in this batch") print(f"More data immediately available: {has_next}") # If no records found, we're done if len(acknowledged_records) == 0: print("No more acknowledged records to move") stored_cursor = next_cursor # Update cursor even when no records found break # Prepare records for ingestion into immutable stream print(f"Ingesting {len(acknowledged_records)} records to IMMUTABLE stream...") immutable_items = [] records_to_delete = [] for record in acknowledged_records: # Extract properties and identifiers record_space = record["space"] record_external_id = record["externalId"] props = record["properties"][space_id][container_id] # Store record identifier for deletion records_to_delete.append({ "space": record_space, "externalId": record_external_id, }) # Prepare for immutable stream immutable_items.append({ "space": record_space, "externalId": record_external_id, "sources": [ { "source": { "type": "container", "space": space_id, "externalId": container_id, }, "properties": props, } ], }) # Ingest into immutable stream immutable_res = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records", json={"items": immutable_items}, ) # Check if ingest was successful (200/201: sync success, 202: async processing) if immutable_res.status_code not in [200, 201, 202]: print(f"Warning: Immutable ingest returned status {immutable_res.status_code}") print(f"Response: {immutable_res.text}") break else: print(f"✓ Ingested {len(immutable_items)} records to IMMUTABLE stream") # Delete acknowledged records from MUTABLE stream print(f"Deleting {len(records_to_delete)} records from MUTABLE stream...") delete_res = client.post( url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/delete", json={"items": records_to_delete}, ) print(f"✓ Deleted {len(records_to_delete)} records from MUTABLE stream") # IMPORTANT: Only update cursor after BOTH operations succeed stored_cursor = next_cursor total_records_moved += len(acknowledged_records) cursor_preview = f"{stored_cursor[:20]}..." if len(stored_cursor) > 20 else stored_cursor print(f"✓ Cursor updated: {cursor_preview}") print(f"Total records moved so far: {total_records_moved}")print(f"\n{'=' * 70}")print("PAGINATION COMPLETE")print(f"{'=' * 70}")print(f"Total batches processed: {batch_number}")print(f"Total records moved: {total_records_moved}")print(f"{'=' * 70}")
Expected output:
Report incorrect code
Copy
Starting pagination loop to archive acknowledged alarms...--- Batch 1 ---Found 10 acknowledged records in this batchMore data immediately available: TrueIngesting 10 records to IMMUTABLE stream...✓ Ingested 10 records to IMMUTABLE streamDeleting 10 records from MUTABLE stream...✓ Deleted 10 records from MUTABLE stream✓ Cursor updated: 08c0b89cecbeb1dab818...Total records moved so far: 10--- Batch 2 ---Found 10 acknowledged records in this batchMore data immediately available: TrueIngesting 10 records to IMMUTABLE stream...✓ Ingested 10 records to IMMUTABLE streamDeleting 10 records from MUTABLE stream...✓ Deleted 10 records from MUTABLE stream✓ Cursor updated: 08c0aae0c3c0b1dab818...Total records moved so far: 20--- Batch 3 ---Found 10 acknowledged records in this batchMore data immediately available: TrueIngesting 10 records to IMMUTABLE stream...✓ Ingested 10 records to IMMUTABLE streamDeleting 10 records from MUTABLE stream...✓ Deleted 10 records from MUTABLE stream✓ Cursor updated: 08c0e5b4cdc4b1dab818...Total records moved so far: 30[... batches 4-6 similar, each moving 10 records ...]--- Batch 7 ---Found 4 acknowledged records in this batchMore data immediately available: FalseIngesting 4 records to IMMUTABLE stream...✓ Ingested 4 records to IMMUTABLE streamDeleting 4 records from MUTABLE stream...✓ Deleted 4 records from MUTABLE stream✓ Cursor updated: 0880beeae1c9b1dab818...Total records moved so far: 64--- Batch 8 ---Found 0 acknowledged records in this batchMore data immediately available: FalseNo more acknowledged records to move======================================================================PAGINATION COMPLETE======================================================================Total batches processed: 8Total records moved: 64======================================================================
Example: verify final distribution across streams
After archiving, verify that the records were moved correctly by querying both streams to see the final distribution of acknowledged and unacknowledged alarms.
MUTABLE Stream Statistics: Total Records: 38 Unacknowledged (False): 38 recordsIMMUTABLE Stream Statistics: Total Records: 64 Acknowledged (True): 64 records
This confirms that all 64 acknowledged alarms have been successfully moved from the mutable stream (which now contains only the remaining 38 unacknowledged alarms) to the immutable archive stream. Together, this accounts for the original 102 total records (38 + 64 = 102).
You’ve successfully built a complete SCADA alarm management pipeline using Records! You’ve created schemas, set up mutable and immutable streams, ingested alarm data, updated records, queried them using multiple endpoints, and implemented a stream-to-stream archiving pattern. This pipeline demonstrates the full lifecycle of managing high-volume OT event data with Records.
Advanced: Build a stream-to-stream transformation pipeline
A powerful architectural pattern for Records is creating data pipelines that move and transform data between streams. This separates raw data ingestion from creating clean, standardized datasets for consumption. You’ll build a three-stage pipeline: ingest raw data to a staging stream, transform it using a Hosted REST Extractor, and consume the standardized data from an archive stream.
The DataStaging stream template is planned for a future release. Until then, you can use the ImmutableTestStream template to test and implement this pattern.
To build a stream-to-stream transformation pipeline:
1
Ingest raw data to a staging stream
Configure an extractor to write raw data from your source system into a staging stream. This stream should contain an exact, untransformed copy of the source data, preserving the original schema from the source system. Use the ImmutableTestStream template (or DataStaging when available) for this staging stream.Expected outcome: Raw data from your source system is stored in a staging stream with the original schema intact.
2
Transform and standardize data
Use the Hosted REST Extractor to create a stream-to-stream pipeline that transforms your staging data:
Configure the extractor to continuously read from the staging stream’s /sync endpoint.
Write the transformed, standardized records to a permanent archive stream (for example, BasicArchive).
Expected outcome: Transformed, standardized records are written to your archive stream with a clean schema optimized for consumption.
3
Consume from the archive stream
Point your applications, dashboards, and analytical jobs to the clean, standardized archive stream for reliable high-performance consumption. This stream contains only the transformed data with your standardized schema.Expected outcome: Your applications consume clean, standardized data from the archive stream without needing to handle raw source data transformations.
This composable architecture lets you ingest data once in its raw format for fidelity and improved data trust, then create streams containing standardized, use-case-specific record schemas from the source stream. This builds a robust and maintainable data processing ecosystem within CDF for your high-volume OT event and log data.
The mapping defines how to transform each input record from the source stream into the output format for the destination stream. The mapping receives each record as input and must produce an output object with type: "immutable_record" along with the target space, stream, and container sources.
Example: transformation mapping for Hosted REST Extractor
The Hosted REST Extractor expects the space, container, and stream to have been created before the mapping can be successfully applied by the extractor job.