Skip to main content
If you’re working with high-volume operational technology (OT) event data—think SCADA alarms, sensor readings, or equipment logs—you’ve likely hit the limits of traditional data pipelines. The Records service in Cognite Data Fusion (CDF) is built specifically for this: ingesting millions of events per second, updating records in real time, and querying them efficiently. In this tutorial, you’ll build a complete SCADA alarm management pipeline that demonstrates the full Records workflow. You’ll create schemas for alarm events, set up mutable and immutable streams for different data lifecycles, ingest and update records, query them using multiple endpoints, and implement a stream-to-stream archiving pattern. By the end, you’ll have a working example that handles live alarm updates, real-time queries, and automated archival. If you’re new to Records concepts like spaces, containers, and streams, check out Records and streams for background.

Prerequisites

  1. Configure access control for your user or service account.
  2. Python environment with cognite-sdk installed.
from cognite.client import CogniteClient
from cognite.client.data_classes import GroupWrite
from cognite.client.data_classes.capabilities import (
    DataModelsAcl,
    StreamRecordsAcl,
    StreamsAcl,
)

client = CogniteClient()

my_group = client.iam.groups.create(
    GroupWrite(
        name="Records Access Group",
        capabilities=[
            DataModelsAcl(
                actions=[
                    DataModelsAcl.Action.Read,
                    DataModelsAcl.Action.Write
                ],
                scope=DataModelsAcl.Scope.All(),
            ),
            StreamRecordsAcl(
                actions=[
                    StreamRecordsAcl.Action.Read,
                    StreamRecordsAcl.Action.Write
                ],
                scope=StreamRecordsAcl.Scope.All(),
            ),
            StreamsAcl(
                actions=[
                    StreamsAcl.Action.Read,
                    StreamsAcl.Action.Create,
                    StreamsAcl.Action.Delete,
                ],
                scope=StreamsAcl.Scope.All(),
            ),
        ],
    )
)

Build a SCADA alarm management pipeline

You’ll build a complete SCADA alarm management system that handles the full lifecycle of alarm events. This pipeline will create a mutable stream for live alarms that need real-time updates, ingest alarm data from your source systems, query records using different endpoints for various use cases, and automatically archive acknowledged alarms to an immutable stream for long-term storage. This pattern mirrors what you’d deploy in production for managing high-volume industrial alarm data.
1

Create a space and define the alarm schema

Start by defining the schema for your alarm records. You’ll create a space to organize your data and a container that defines the structure of each alarm event. If you’re new to these concepts, check out the data modeling concepts for background.
  1. Create a space to serve as a namespace for your SCADA alarm data. This keeps your alarm schemas and records organized and separate from other data in your CDF project.
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import SpaceApply

client = CogniteClient()

space_id = "scada-alarms"
space = client.data_modeling.spaces.apply(
    SpaceApply(
        space=space_id,
        description="SCADA alarm events",
        name="SCADA Alarms",
    )
)
  1. Create and define the container that specifies what properties each alarm record will have. This schema includes the fields you’d typically see in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.
When creating containers for records, you must set usedFor to "record". Records can only be ingested into containers specifically designated for records.
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import (
    ContainerApply,
    ContainerProperty,
    Text,
    Float64,
    Int64,
    Boolean,
    Timestamp,
)

client = CogniteClient()

container_id = "alarm_events"
container = client.data_modeling.containers.apply(
    ContainerApply(
        space="scada-alarms",
        external_id=container_id,
        name="SCADA Alarm Events",
        description="Container for SCADA alarm event data",
        used_for="record",
        properties={
            # Text property - alarm identifier
            "alarm_id": ContainerProperty(
                type=Text(is_list=False),
                nullable=False,
                name="Alarm ID",
                description="Unique identifier for the alarm",
            ),
            # Text property - alarm severity
            "severity": ContainerProperty(
                type=Text(is_list=False),
                nullable=False,
                name="Severity",
                description="Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)",
            ),
            # Float property - alarm value
            "value": ContainerProperty(
                type=Float64(is_list=False),
                nullable=False,
                name="Value",
                description="Numeric value associated with the alarm condition",
            ),
            # Int property - priority score
            "priority": ContainerProperty(
                type=Int64(is_list=False),
                nullable=False,
                name="Priority",
                description="Priority score (1-100)",
            ),
            # Boolean property - acknowledged status
            "is_acknowledged": ContainerProperty(
                type=Boolean(is_list=False),
                nullable=False,
                name="Acknowledged Status",
                description="True if alarm has been acknowledged",
            ),
            # Timestamp property - when alarm occurred
            "timestamp": ContainerProperty(
                type=Timestamp(is_list=False),
                nullable=False,
                name="Timestamp",
                description="Time when the alarm occurred",
            ),
        },
    )
)
2

Create streams for different lifecycles

Now create the streams that will hold your alarm records. Streams need a stream settings template that defines their behavior, performance characteristics, and lifecycle policies. For this pipeline, you’ll create two streams to handle different stages of the alarm lifecycle.Choose your stream template based on data lifecycle and throughput requirements, not by record type or source system. For detailed specifications of each template’s limits, see the Streams API documentation.
  • Create a mutable stream for active alarms that need real-time updates, like when an operator acknowledges an alarm. This stream uses the BasicLiveData template, which is optimized for data that changes frequently.
from cognite.client import CogniteClient

client = CogniteClient()

mutable_stream_id = "alarm_data_live_stream"
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams",
    json={
        "items": [
            {
                "externalId": mutable_stream_id,
                "settings": {"template": {"name": "BasicLiveData"}},
            }
        ]
    }
)
  • Create an immutable stream for archived alarms that won’t change after they’re written. This stream uses the ImmutableTestStream template, which is designed for write-once, read-many scenarios like long-term archival.
from cognite.client import CogniteClient

client = CogniteClient()

immutable_stream_id = "alarm_data_archive_stream"
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams",
    json={
        "items": [
            {
                "externalId": immutable_stream_id,
                "settings": {"template": {"name": "ImmutableTestStream"}},
            }
        ]
    }
)
3

Ingest records

Now you’ll ingest alarm records into your mutable stream. In production, you’d typically use a Cognite extractor that reads directly from your source system and writes to Records. For this tutorial, you’ll use the API directly to simulate the ingestion process.
The Records service does not support onboarding data using RAW and Transformations. Always ingest data directly to a Records stream using one of the supported extractors:
When writing records to the API, you need to specify the target stream identifier and include the container’s space.externalId to identify which schema the data follows.
  1. Start by ingesting a couple of hardcoded alarm records to verify your setup works. This gives you a small dataset to test queries and updates before adding more data.
from cognite.client import CogniteClient
import uuid

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Create two hardcoded alarm records
hardcoded_records = [
    {
        "alarm_id": "ALARM-PRESS-001",
        "severity": "CRITICAL",
        "value": 125.8,
        "priority": 95,
        "is_acknowledged": False,
        "timestamp": "2025-10-22T10:00:00.000Z",
    },
    {
        "alarm_id": "ALARM-TEMP-042",
        "severity": "MEDIUM",
        "value": 88.3,
        "priority": 60,
        "is_acknowledged": False,
        "timestamp": "2025-10-22T10:00:00.000Z",
    },
]

# Prepare records for ingestion
items = []
for record in hardcoded_records:
    ext_id = str(uuid.uuid4())
    items.append(
        {
            "space": space_id,
            "externalId": ext_id,
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": record,
                }
            ],
        }
    )

# Ingest hardcoded records
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
    json={"items": items},
)
  1. Add 100 random alarm records to simulate a realistic dataset. This larger dataset will make your queries and aggregations more meaningful and help you see how Records handles volume.
from cognite.client import CogniteClient
from datetime import datetime, timedelta
import random
import uuid

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Generate 100 random alarm records
alarm_ids = [
    "ALARM-PRESS-001",
    "ALARM-TEMP-042",
    "ALARM-FLOW-013",
    "ALARM-VIBR-088",
    "ALARM-LEVEL-055",
]
severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
base_time = datetime.now() - timedelta(hours=2)

random_records = [
    {
        "alarm_id": random.choice(alarm_ids),
        "severity": random.choice(severities),
        "value": round(random.uniform(50.0, 150.0), 2),
        "priority": random.randint(20, 100),
        "is_acknowledged": random.choice([True, False]),
        "timestamp": (base_time + timedelta(minutes=i)).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
    }
    for i in range(100)
]

# Prepare random records for ingestion
items = []
for record in random_records:
    items.append(
        {
            "space": space_id,
            "externalId": str(uuid.uuid4()),
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": record,
                }
            ],
        }
    )

# Ingest random records
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
    json={"items": items},
)
4

Update records (mutable streams only)

One advantage of mutable streams is the ability to update records after they’re written. This is essential for scenarios like acknowledging alarms when an operator responds, updating status fields as conditions change, or correcting data errors. Use the upsert endpoint to update existing records by their space and externalId.To update a record, you need its externalId. In practice, you’d typically get this from a previous query or store it when you first ingest the record. For this example, you’ll update one of the alarm records you ingested earlier to mark it as acknowledged.
from cognite.client import CogniteClient

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Assuming you have the external_id of a record you want to update
# In practice, you would get this from a previous query or store it during ingestion
record_external_id = "your-record-external-id"  # Replace with actual ID

# Update the record to set is_acknowledged to True
updated_record = {
    "alarm_id": "ALARM-TEMP-042",
    "severity": "MEDIUM",
    "value": 88.3,
    "priority": 60,
    "is_acknowledged": True,
    "timestamp": "2025-10-22T10:00:00.000Z",
}

response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/upsert",
    json={
        "items": [
            {
                "space": space_id,
                "externalId": record_external_id,
                "sources": [
                    {
                        "source": {
                            "type": "container",
                            "space": space_id,
                            "externalId": container_id,
                        },
                        "properties": updated_record,
                    }
                ],
            }
        ]
    },
)
5

Query records

Next, you’ll query your alarm records using the Records API. The API provides three endpoints for consuming data, each optimized for different use cases:
  • For interactive queries, use the filter endpoint.
  • For large datasets with pagination, use the sync endpoint.
  • For statistical analysis, use the aggregate endpoint.
The query syntax for filtering is similar across all endpoints.
When querying records, we recommend that you use the hasData filter to ensure you only retrieve records that contain data in the container you’re querying. A record can exist in a stream but may not have data for every container. The hasData filter ensures efficient queries by excluding records that don’t have data in the selected container.
Time-based filtering with lastUpdatedTimeThe lastUpdatedTime filter is mandatory for filter and aggregate queries on immutable streams. This filter defines the time range for retrieving records:
  • gt (greater than): The start of the time range (required for immutable streams)
  • lt (less than): The end of the time range (optional, defaults to current time if not specified)
The maximum time range you can query is defined by the stream’s maxFilteringInterval setting, which is determined by the stream template you chose when the stream was created. You can check the limit by retrieving the stream’s settings at /api/v1/projects/{project}/streams/{streamId} and inspecting settings.limits.maxFilteringInterval.For mutable streams, lastUpdatedTime is optional, but we recommend that you use it for improved query performance.
  • Use the filter endpoint for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set. It returns an unpaginated result set with a maximum of 1,000 records per request and supports custom sorting on any property. This endpoint is ideal for queries like “show me the top 10 unacknowledged alarms sorted by priority.”
from cognite.client import CogniteClient
from datetime import datetime, timedelta
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Filter for unacknowledged alarms, sorted by priority descending
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/filter",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "sources": [
            {
                "source": {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                },
                "properties": ["*"],
            }
        ],
        "filter": {
            "and": [
                {
                    "hasData": [
                        {
                            "type": "container",
                            "space": space_id,
                            "externalId": container_id,
                        }
                    ]
                },
                {
                    "equals": {
                        "property": [space_id, container_id, "is_acknowledged"],
                        "value": False,
                    }
                },
            ]
        },
        "sort": [
            {
                "property": [space_id, container_id, "priority"],
                "direction": "descending",
            }
        ],
        "limit": 10,
    },
)

results = response.json()
items = results.get("items", [])

# Display as table
table_data = []
for item in items[:5]:  # Show first 5
    props = item["properties"][space_id][container_id]
    table_data.append([
        props["alarm_id"],
        props["severity"],
        props["value"],
        props["priority"],
        props["timestamp"],
    ])

print(f"Found {len(items)} unacknowledged alarms (showing first 5):")
print(tabulate(
    table_data,
    headers=["Alarm ID", "Severity", "Value", "Priority", "Timestamp"],
    tablefmt="github",
))
Expected output:
Found 10 unacknowledged alarms (showing first 5):
| Alarm ID        | Severity   |   Value |   Priority | Timestamp                |
|-----------------|------------|---------|------------|--------------------------|
| ALARM-TEMP-042  | CRITICAL   |   98.28 |        100 | 2025-10-24T12:04:54.000Z |
| ALARM-TEMP-042  | LOW        |   71.82 |        100 | 2025-10-24T12:44:54.000Z |
| ALARM-LEVEL-055 | LOW        |   69.19 |         99 | 2025-10-24T12:55:54.000Z |
| ALARM-FLOW-013  | LOW        |   56.17 |         97 | 2025-10-24T12:50:54.000Z |
| ALARM-FLOW-013  | HIGH       |  131.5  |         97 | 2025-10-24T11:45:54.000Z |
  • Use the sync endpoint for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently. It returns a paginated result set—you iterate through the data by making subsequent requests with the cursor returned in the previous response. The sync process is complete when the API response has hasNext set to false. The sync endpoint supports a maximum of 1,000 records per page and doesn’t support custom sorting. The read throughput is governed by the stream settings defined by the stream template you chose when creating the stream. This endpoint is designed for workflows like “export all recent alarm events for analysis” or for implementing incremental data pipelines.
from cognite.client import CogniteClient
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Sync records from the last 2 days
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
    json={
        "sources": [
            {
                "source": {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                },
                "properties": ["*"],
            }
        ],
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "initializeCursor": "2d-ago",
        "limit": 10,
    },
)

results = response.json()
items = results.get("items", [])

# Display as table
table_data = []
for item in items[:5]:  # Show first 5
    props = item["properties"][space_id][container_id]
    ack_status = "ACK" if props["is_acknowledged"] else "UNACK"
    table_data.append([
        props["alarm_id"],
        props["severity"],
        ack_status,
        props["value"],
        props["timestamp"],
    ])

print(f"Sync returned {len(items)} records (showing first 5):")
print(tabulate(
    table_data,
    headers=["Alarm ID", "Severity", "Status", "Value", "Timestamp"],
    tablefmt="github",
))

if "nextCursor" in results:
    cursor = results["nextCursor"]
    cursor_preview = f"{cursor[:5]}..." if len(cursor) > 5 else cursor
    print(f"\nNext cursor for incremental sync: {cursor_preview}")
Expected output:
Sync returned 10 records (showing first 5):
| Alarm ID        | Severity   | Status   |   Value | Timestamp                |
|-----------------|------------|----------|---------|--------------------------|
| ALARM-PRESS-001 | CRITICAL   | UNACK    |  125.8  | 2025-10-22T10:00:00.000Z |
| ALARM-TEMP-042  | LOW        | UNACK    |   98.78 | 2025-10-24T11:27:54.000Z |
| ALARM-VIBR-088  | CRITICAL   | ACK      |  122.78 | 2025-10-24T11:28:54.000Z |
| ALARM-VIBR-088  | MEDIUM     | ACK      |  140.56 | 2025-10-24T11:29:54.000Z |
| ALARM-LEVEL-055 | CRITICAL   | ACK      |   50.59 | 2025-10-24T11:30:54.000Z |

Next cursor for incremental sync: 08c0e...
  • Use the aggregate endpoint to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations. It returns aggregated results based on the specified aggregation functions and groupings, optimized for analytical queries that require summarizing large datasets without retrieving individual records. This endpoint is ideal for generating reports like “count of alarms by severity” or “average alarm value and priority statistics.”
from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Calculate overall statistics: count, average value, min/max values, average priority
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "total_count": {"count": {}},
            "avg_value": {"avg": {"property": [space_id, container_id, "value"]}},
            "min_value": {"min": {"property": [space_id, container_id, "value"]}},
            "max_value": {"max": {"property": [space_id, container_id, "value"]}},
            "avg_priority": {"avg": {"property": [space_id, container_id, "priority"]}},
        },
    },
)

results = response.json()
aggs = results["aggregates"]

print("Overall Statistics:")
print(f"  Total Records: {aggs['total_count']['count']}")
print(f"  Average Value: {aggs['avg_value']['avg']:.2f}")
print(f"  Min Value: {aggs['min_value']['min']:.2f}")
print(f"  Max Value: {aggs['max_value']['max']:.2f}")
print(f"  Average Priority: {aggs['avg_priority']['avg']:.1f}")
Expected output:
Overall Statistics:
  Total Records: 102
  Average Value: 95.87
  Min Value: 50.15
  Max Value: 149.39
  Average Priority: 63.0
from cognite.client import CogniteClient
from datetime import datetime, timedelta
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Group by alarm_id and calculate statistics for each alarm
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "by_alarm": {
                "uniqueValues": {
                    "property": [space_id, container_id, "alarm_id"],
                    "aggregates": {
                        "avg_value": {
                            "avg": {"property": [space_id, container_id, "value"]}
                        },
                        "by_severity": {
                            "uniqueValues": {
                                "property": [space_id, container_id, "severity"]
                            }
                        },
                    },
                }
            }
        },
    },
)

results = response.json()
buckets = results["aggregates"]["by_alarm"]["uniqueValueBuckets"]

print("Per-Alarm Statistics:")
table_data = []
for bucket in buckets:
    alarm = bucket["value"]
    count = bucket["count"]
    avg_value = bucket["aggregates"]["avg_value"]["avg"]

    # Get most common severity from nested uniqueValues buckets
    severity_buckets = bucket["aggregates"]["by_severity"]["uniqueValueBuckets"]
    most_common_severity = (
        max(severity_buckets, key=lambda x: x["count"])["value"]
        if severity_buckets
        else "N/A"
    )

    table_data.append([alarm, count, f"{avg_value:.2f}", most_common_severity])

print(tabulate(
    table_data,
    headers=["Alarm ID", "Occurrences", "Avg Value", "Most Common Severity"],
    tablefmt="github",
))
Expected output:
Per-Alarm Statistics:
| Alarm ID        |   Occurrences |   Avg Value | Most Common Severity   |
|-----------------|---------------|-------------|------------------------|
| ALARM-FLOW-013  |            26 |       91.50 | LOW                    |
| ALARM-TEMP-042  |            24 |       92.15 | CRITICAL               |
| ALARM-PRESS-001 |            19 |      104.62 | CRITICAL               |
| ALARM-VIBR-088  |            17 |      106.93 | LOW                    |
| ALARM-LEVEL-055 |            16 |       86.39 | CRITICAL               |
6

Archive acknowledged alarms (stream-to-stream pattern)

The final step of this tutorial is to implement a common Records pattern: moving data between streams based on lifecycle stages. You’ll archive acknowledged alarms by moving them from the mutable stream to the immutable stream. This stream-to-stream pattern is useful for:
  • Hot/cold storage: keep active data in mutable streams, archive historical data in immutable streams
  • Data lifecycle management: move processed records to long-term storage automatically
  • Performance optimization: keep active streams lean by moving old data to archives
The key to reliable stream-to-stream operations is using the sync endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails or is interrupted.
from cognite.client import CogniteClient

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"
immutable_stream_id = "alarm_data_archive_stream"

# In a production system, you would persist this cursor between runs
stored_cursor = None

# Pagination settings
BATCH_SIZE = 10
total_records_moved = 0
batch_number = 0

print("Starting pagination loop to archive acknowledged alarms...")

# Keep paginating until there are no more acknowledged records
while True:
    batch_number += 1
    print(f"\n--- Batch {batch_number} ---")

    # Sync acknowledged records from MUTABLE stream
    sync_response = client.post(
        url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
        json={
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": ["*"],
                }
            ],
            # Use stored cursor if available, otherwise initialize from 2 days ago
            **(
                {"cursor": stored_cursor}
                if stored_cursor
                else {"initializeCursor": "2d-ago"}
            ),
            "filter": {
                "and": [
                    {
                        "hasData": [
                            {
                                "type": "container",
                                "space": space_id,
                                "externalId": container_id,
                            }
                        ]
                    },
                    {
                        "equals": {
                            "property": [space_id, container_id, "is_acknowledged"],
                            "value": True,
                        }
                    },
                ]
            },
            "limit": BATCH_SIZE,
        },
    )

    sync_results = sync_response.json()
    acknowledged_records = sync_results.get("items", [])
    next_cursor = sync_results["nextCursor"]
    has_next = sync_results["hasNext"]

    print(f"Found {len(acknowledged_records)} acknowledged records in this batch")
    print(f"More data immediately available: {has_next}")

    # If no records found, we're done
    if len(acknowledged_records) == 0:
        print("No more acknowledged records to move")
        stored_cursor = next_cursor  # Update cursor even when no records found
        break

    # Prepare records for ingestion into immutable stream
    print(f"Ingesting {len(acknowledged_records)} records to IMMUTABLE stream...")

    immutable_items = []
    records_to_delete = []

    for record in acknowledged_records:
        # Extract properties and identifiers
        record_space = record["space"]
        record_external_id = record["externalId"]
        props = record["properties"][space_id][container_id]

        # Store record identifier for deletion
        records_to_delete.append({
            "space": record_space,
            "externalId": record_external_id,
        })

        # Prepare for immutable stream
        immutable_items.append({
            "space": record_space,
            "externalId": record_external_id,
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": props,
                }
            ],
        })

    # Ingest into immutable stream
    immutable_res = client.post(
        url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records",
        json={"items": immutable_items},
    )

    # Check if ingest was successful (200/201: sync success, 202: async processing)
    if immutable_res.status_code not in [200, 201, 202]:
        print(f"Warning: Immutable ingest returned status {immutable_res.status_code}")
        print(f"Response: {immutable_res.text}")
        break
    else:
        print(f"✓ Ingested {len(immutable_items)} records to IMMUTABLE stream")

    # Delete acknowledged records from MUTABLE stream
    print(f"Deleting {len(records_to_delete)} records from MUTABLE stream...")

    delete_res = client.post(
        url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/delete",
        json={"items": records_to_delete},
    )
    print(f"✓ Deleted {len(records_to_delete)} records from MUTABLE stream")

    # IMPORTANT: Only update cursor after BOTH operations succeed
    stored_cursor = next_cursor
    total_records_moved += len(acknowledged_records)

    cursor_preview = f"{stored_cursor[:20]}..." if len(stored_cursor) > 20 else stored_cursor
    print(f"✓ Cursor updated: {cursor_preview}")
    print(f"Total records moved so far: {total_records_moved}")

print(f"\n{'=' * 70}")
print("PAGINATION COMPLETE")
print(f"{'=' * 70}")
print(f"Total batches processed: {batch_number}")
print(f"Total records moved: {total_records_moved}")
print(f"{'=' * 70}")
Expected output:
Starting pagination loop to archive acknowledged alarms...

--- Batch 1 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0b89cecbeb1dab818...
Total records moved so far: 10

--- Batch 2 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0aae0c3c0b1dab818...
Total records moved so far: 20

--- Batch 3 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0e5b4cdc4b1dab818...
Total records moved so far: 30

[... batches 4-6 similar, each moving 10 records ...]

--- Batch 7 ---
Found 4 acknowledged records in this batch
More data immediately available: False
Ingesting 4 records to IMMUTABLE stream...
✓ Ingested 4 records to IMMUTABLE stream
Deleting 4 records from MUTABLE stream...
✓ Deleted 4 records from MUTABLE stream
✓ Cursor updated: 0880beeae1c9b1dab818...
Total records moved so far: 64

--- Batch 8 ---
Found 0 acknowledged records in this batch
More data immediately available: False
No more acknowledged records to move

======================================================================
PAGINATION COMPLETE
======================================================================
Total batches processed: 8
Total records moved: 64
======================================================================
After archiving, verify that the records were moved correctly by querying both streams to see the final distribution of acknowledged and unacknowledged alarms.
from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"
immutable_stream_id = "alarm_data_archive_stream"

# Query mutable stream for distribution
mutable_response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "total_count": {"count": {}},
            "by_acknowledged": {
                "uniqueValues": {
                    "property": [space_id, container_id, "is_acknowledged"]
                }
            },
        },
    },
)

mutable_results = mutable_response.json()
print("MUTABLE Stream Statistics:")
print(f"  Total Records: {mutable_results['aggregates']['total_count']['count']}")

mutable_buckets = mutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
for bucket in mutable_buckets:
    is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
    ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
    print(f"  {ack_status}: {bucket['count']} records")

# Query immutable stream for distribution
immutable_response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "total_count": {"count": {}},
            "by_acknowledged": {
                "uniqueValues": {
                    "property": [space_id, container_id, "is_acknowledged"]
                }
            },
        },
    },
)

immutable_results = immutable_response.json()
print("\nIMMUTABLE Stream Statistics:")
print(f"  Total Records: {immutable_results['aggregates']['total_count']['count']}")

immutable_buckets = immutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
for bucket in immutable_buckets:
    is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
    ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
    print(f"  {ack_status}: {bucket['count']} records")
Expected output:
MUTABLE Stream Statistics:
  Total Records: 38
  Unacknowledged (False): 38 records

IMMUTABLE Stream Statistics:
  Total Records: 64
  Acknowledged (True): 64 records
This confirms that all 64 acknowledged alarms have been successfully moved from the mutable stream (which now contains only the remaining 38 unacknowledged alarms) to the immutable archive stream. Together, this accounts for the original 102 total records (38 + 64 = 102).
You’ve successfully built a complete SCADA alarm management pipeline using Records! You’ve created schemas, set up mutable and immutable streams, ingested alarm data, updated records, queried them using multiple endpoints, and implemented a stream-to-stream archiving pattern. This pipeline demonstrates the full lifecycle of managing high-volume OT event data with Records.

Advanced: Build a stream-to-stream transformation pipeline

A powerful architectural pattern for Records is creating data pipelines that move and transform data between streams. This separates raw data ingestion from creating clean, standardized datasets for consumption. You’ll build a three-stage pipeline: ingest raw data to a staging stream, transform it using a Hosted REST Extractor, and consume the standardized data from an archive stream.
The DataStaging stream template is planned for a future release. Until then, you can use the ImmutableTestStream template to test and implement this pattern.
To build a stream-to-stream transformation pipeline:
1

Ingest raw data to a staging stream

Configure an extractor to write raw data from your source system into a staging stream. This stream should contain an exact, untransformed copy of the source data, preserving the original schema from the source system. Use the ImmutableTestStream template (or DataStaging when available) for this staging stream.Expected outcome: Raw data from your source system is stored in a staging stream with the original schema intact.
2

Transform and standardize data

Use the Hosted REST Extractor to create a stream-to-stream pipeline that transforms your staging data:
  1. Configure the extractor to continuously read from the staging stream’s /sync endpoint.
  2. Use the extractor’s built-in mapping capabilities to perform lightweight transformations.
  3. Write the transformed, standardized records to a permanent archive stream (for example, BasicArchive).
Expected outcome: Transformed, standardized records are written to your archive stream with a clean schema optimized for consumption.
3

Consume from the archive stream

Point your applications, dashboards, and analytical jobs to the clean, standardized archive stream for reliable high-performance consumption. This stream contains only the transformed data with your standardized schema.Expected outcome: Your applications consume clean, standardized data from the archive stream without needing to handle raw source data transformations.
This composable architecture lets you ingest data once in its raw format for fidelity and improved data trust, then create streams containing standardized, use-case-specific record schemas from the source stream. This builds a robust and maintainable data processing ecosystem within CDF for your high-volume OT event and log data.

Create a transformation mapping

The mapping defines how to transform each input record from the source stream into the output format for the destination stream. The mapping receives each record as input and must produce an output object with type: "immutable_record" along with the target space, stream, and container sources.
{
    "space": "my_space",
    "externalId": concat("record_", input.id, "_", now()),
    "stream": "my_stream_id",
    "sources": [
        {
            "source": {
                "space": "schema_space",
                "externalId": "my_container"
            },
            "properties": {
                "value": input.temperature,
                "timestamp": to_unix_timestamp(input.datetime, "%Y-%m-%dT%H:%M:%S"),
                "sensorId": input.sensor.id,
                "location": input.sensor.location
            }
        }
    ],
    "type": "immutable_record"
}
The Hosted REST Extractor expects the space, container, and stream to have been created before the mapping can be successfully applied by the extractor job.