Skip to main content
This hands-on guide teaches you the core patterns for working with Records in Cognite Data Fusion (CDF). You’ll learn how to create schemas, set up streams, ingest high-volume event data, query records efficiently, and build data pipelines between streams. Throughout this guide, SCADA alarm management serves as a practical example, but these patterns apply to any high-volume industrial data: multi-value sensor readings, event frames, equipment logs, telemetry events, and more. The Records service is built specifically for ingesting billions of events and querying them efficiently. Records are designed primarily for immutable data like logs, events, and historical records that don’t change after ingestion. If your data needs updates during its lifecycle (like active alarms awaiting acknowledgment), you can use a mutable stream as a transitional stage, and then archive the finalized records to an immutable stream. For data that requires frequent updates and will never be archived, consider using data modeling nodes instead. By the end of this guide, you’ll have a working example that handles the full lifecycle: ingesting data, updating records during their active phase, querying efficiently, and archiving completed records to immutable storage. If you’re new to Records concepts like spaces, containers, and streams, see Records and streams for background.

Prerequisites

  1. Configure access control for your user or service account.
  2. Python environment with cognite-sdk installed.
from cognite.client import CogniteClient
from cognite.client.data_classes import GroupWrite
from cognite.client.data_classes.capabilities import (
    DataModelsAcl,
    StreamRecordsAcl,
    StreamsAcl,
)

client = CogniteClient()

my_group = client.iam.groups.create(
    GroupWrite(
        name="Records Access Group",
        capabilities=[
            DataModelsAcl(
                actions=[
                    DataModelsAcl.Action.Read,
                    DataModelsAcl.Action.Write
                ],
                scope=DataModelsAcl.Scope.All(),
            ),
            StreamRecordsAcl(
                actions=[
                    StreamRecordsAcl.Action.Read,
                    StreamRecordsAcl.Action.Write
                ],
                scope=StreamRecordsAcl.Scope.All(),
            ),
            StreamsAcl(
                actions=[
                    StreamsAcl.Action.Read,
                    StreamsAcl.Action.Create,
                    StreamsAcl.Action.Delete,
                ],
                scope=StreamsAcl.Scope.All(),
            ),
        ],
    )
)

Build your first Records pipeline

In this section, you’ll build a complete data pipeline using a SCADA alarm management scenario as an example. This pipeline demonstrates the full Records lifecycle: using a mutable stream as a temporary staging area for active data, ingesting and updating records, querying with different endpoints, and archiving finalized records to an immutable stream for permanent storage. This pattern mirrors what you’d deploy in production for managing any high-volume event data where records have a defined lifecycle.
1

Create a space and define the alarm schema

Start by defining the schema for your alarm records. You’ll create a space to organize your data and a container that defines the structure of each alarm event. If you’re new to these concepts, check out the data modeling concepts for background.
  1. Create a space to serve as a namespace for your SCADA alarm data. This keeps your alarm schemas and records organized and separate from other data in your CDF project.
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import SpaceApply

client = CogniteClient()

space_id = "scada-alarms"
space = client.data_modeling.spaces.apply(
    SpaceApply(
        space=space_id,
        description="SCADA alarm events",
        name="SCADA Alarms",
    )
)
  1. Create and define the container that specifies what properties each alarm record will have. This schema includes the fields you’d typically see in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.
When creating containers for records, you must set usedFor to "record". Records can only be ingested into containers specifically designated for records.
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import (
    ContainerApply,
    ContainerProperty,
    Text,
    Float64,
    Int64,
    Boolean,
    Timestamp,
)

client = CogniteClient()

container_id = "alarm_events"
container = client.data_modeling.containers.apply(
    ContainerApply(
        space="scada-alarms",
        external_id=container_id,
        name="SCADA Alarm Events",
        description="Container for SCADA alarm event data",
        used_for="record",
        properties={
            # Text property - alarm identifier
            "alarm_id": ContainerProperty(
                type=Text(is_list=False),
                nullable=False,
                name="Alarm ID",
                description="Unique identifier for the alarm",
            ),
            # Text property - alarm severity
            "severity": ContainerProperty(
                type=Text(is_list=False),
                nullable=False,
                name="Severity",
                description="Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)",
            ),
            # Float property - alarm value
            "value": ContainerProperty(
                type=Float64(is_list=False),
                nullable=False,
                name="Value",
                description="Numeric value associated with the alarm condition",
            ),
            # Int property - priority score
            "priority": ContainerProperty(
                type=Int64(is_list=False),
                nullable=False,
                name="Priority",
                description="Priority score (1-100)",
            ),
            # Boolean property - acknowledged status
            "is_acknowledged": ContainerProperty(
                type=Boolean(is_list=False),
                nullable=False,
                name="Acknowledged Status",
                description="True if alarm has been acknowledged",
            ),
            # Timestamp property - when alarm occurred
            "timestamp": ContainerProperty(
                type=Timestamp(is_list=False),
                nullable=False,
                name="Timestamp",
                description="Time when the alarm occurred",
            ),
        },
    )
)
2

Create streams for different lifecycles

Now create the streams that will hold your alarm records. Streams need a stream settings template that defines their behavior, performance characteristics, and lifecycle policies. For this pipeline, you’ll create two streams to handle different stages of the alarm lifecycle.Choose your stream template based on data lifecycle and throughput requirements, not by record type or source system. For detailed specifications of each template’s limits, see the Streams API documentation.
  • Create a mutable stream for active alarms that need real-time updates, like when an operator acknowledges an alarm. This stream uses the BasicLiveData template, which is optimized for data that changes frequently.
from cognite.client import CogniteClient

client = CogniteClient()

mutable_stream_id = "alarm_data_live_stream"
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams",
    json={
        "items": [
            {
                "externalId": mutable_stream_id,
                "settings": {"template": {"name": "BasicLiveData"}},
            }
        ]
    }
)
  • Create an immutable stream for archived alarms that won’t change after they’re written. This stream uses the ImmutableTestStream template, which is designed for write-once, read-many scenarios like long-term archival.
from cognite.client import CogniteClient

client = CogniteClient()

immutable_stream_id = "alarm_data_archive_stream"
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams",
    json={
        "items": [
            {
                "externalId": immutable_stream_id,
                "settings": {"template": {"name": "ImmutableTestStream"}},
            }
        ]
    }
)
3

Ingest records

Now you’ll ingest alarm records into your mutable stream. In production, you’d typically use a Cognite extractor that reads directly from your source system and writes to Records. For this tutorial, you’ll use the API directly to simulate the ingestion process.
The Records service does not support onboarding data using RAW and Transformations. Always ingest data directly to a Records stream using one of the supported extractors:
When writing records to the API, you need to specify the target stream identifier and include the container’s space.externalId to identify which schema the data follows.
  1. Start by ingesting a couple of hardcoded alarm records to verify your setup works. This gives you a small dataset to test queries and updates before adding more data.
from cognite.client import CogniteClient
import uuid

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Create two hardcoded alarm records
hardcoded_records = [
    {
        "alarm_id": "ALARM-PRESS-001",
        "severity": "CRITICAL",
        "value": 125.8,
        "priority": 95,
        "is_acknowledged": False,
        "timestamp": "2025-10-22T10:00:00.000Z",
    },
    {
        "alarm_id": "ALARM-TEMP-042",
        "severity": "MEDIUM",
        "value": 88.3,
        "priority": 60,
        "is_acknowledged": False,
        "timestamp": "2025-10-22T10:00:00.000Z",
    },
]

# Prepare records for ingestion
items = []
for record in hardcoded_records:
    ext_id = str(uuid.uuid4())
    items.append(
        {
            "space": space_id,
            "externalId": ext_id,
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": record,
                }
            ],
        }
    )

# Ingest hardcoded records
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
    json={"items": items},
)
  1. Add 100 random alarm records to simulate a realistic dataset. This larger dataset will make your queries and aggregations more meaningful and help you see how Records handles volume.
from cognite.client import CogniteClient
from datetime import datetime, timedelta
import random
import uuid

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Generate 100 random alarm records
alarm_ids = [
    "ALARM-PRESS-001",
    "ALARM-TEMP-042",
    "ALARM-FLOW-013",
    "ALARM-VIBR-088",
    "ALARM-LEVEL-055",
]
severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
base_time = datetime.now() - timedelta(hours=2)

random_records = [
    {
        "alarm_id": random.choice(alarm_ids),
        "severity": random.choice(severities),
        "value": round(random.uniform(50.0, 150.0), 2),
        "priority": random.randint(20, 100),
        "is_acknowledged": random.choice([True, False]),
        "timestamp": (base_time + timedelta(minutes=i)).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
    }
    for i in range(100)
]

# Prepare random records for ingestion
items = []
for record in random_records:
    items.append(
        {
            "space": space_id,
            "externalId": str(uuid.uuid4()),
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": record,
                }
            ],
        }
    )

# Ingest random records
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
    json={"items": items},
)
4

Update records (mutable streams only)

One advantage of mutable streams is the ability to update records after they’re written. This is essential for scenarios like acknowledging alarms when an operator responds, updating status fields as conditions change, or correcting data errors. Use the upsert endpoint to update existing records by their space and externalId.To update a record, you need its externalId. In practice, you’d typically get this from a previous query or store it when you first ingest the record. For this example, you’ll update one of the alarm records you ingested earlier to mark it as acknowledged.
from cognite.client import CogniteClient

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Assuming you have the external_id of a record you want to update
# In practice, you would get this from a previous query or store it during ingestion
record_external_id = "your-record-external-id"  # Replace with actual ID

# Update the record to set is_acknowledged to True
updated_record = {
    "alarm_id": "ALARM-TEMP-042",
    "severity": "MEDIUM",
    "value": 88.3,
    "priority": 60,
    "is_acknowledged": True,
    "timestamp": "2025-10-22T10:00:00.000Z",
}

response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/upsert",
    json={
        "items": [
            {
                "space": space_id,
                "externalId": record_external_id,
                "sources": [
                    {
                        "source": {
                            "type": "container",
                            "space": space_id,
                            "externalId": container_id,
                        },
                        "properties": updated_record,
                    }
                ],
            }
        ]
    },
)
5

Query records

Next, you’ll query your alarm records using the Records API. The API provides three endpoints for consuming data, each optimized for different use cases:
  • For interactive queries, use the filter endpoint.
  • For large datasets with pagination, use the sync endpoint.
  • For statistical analysis, use the aggregate endpoint.
The query syntax for filtering is similar across all endpoints.
When querying records, we recommend that you use the hasData filter to ensure you only retrieve records that contain data in the container you’re querying. A record can exist in a stream but may not have data for every container. The hasData filter ensures efficient queries by excluding records that don’t have data in the selected container.
Time-based filtering with lastUpdatedTimeThe lastUpdatedTime filter is mandatory for filter and aggregate queries on immutable streams. This filter defines the time range for retrieving records:
  • gt (greater than): The start of the time range (required for immutable streams)
  • lt (less than): The end of the time range (optional, defaults to current time if not specified)
The maximum time range you can query is defined by the stream’s maxFilteringInterval setting, which is determined by the stream template you chose when the stream was created. You can check the limit by retrieving the stream’s settings at /api/v1/projects/{project}/streams/{streamId} and inspecting settings.limits.maxFilteringInterval.For mutable streams, lastUpdatedTime is optional, but we recommend that you use it for improved query performance.
  • Use the filter endpoint for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set. It returns an unpaginated result set with a maximum of 1,000 records per request and supports custom sorting on any property. This endpoint is ideal for queries like “show me the top 10 unacknowledged alarms sorted by priority.”
from cognite.client import CogniteClient
from datetime import datetime, timedelta
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Filter for unacknowledged alarms, sorted by priority descending
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/filter",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "sources": [
            {
                "source": {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                },
                "properties": ["*"],
            }
        ],
        "filter": {
            "and": [
                {
                    "hasData": [
                        {
                            "type": "container",
                            "space": space_id,
                            "externalId": container_id,
                        }
                    ]
                },
                {
                    "equals": {
                        "property": [space_id, container_id, "is_acknowledged"],
                        "value": False,
                    }
                },
            ]
        },
        "sort": [
            {
                "property": [space_id, container_id, "priority"],
                "direction": "descending",
            }
        ],
        "limit": 10,
    },
)

results = response.json()
items = results.get("items", [])

# Display as table
table_data = []
for item in items[:5]:  # Show first 5
    props = item["properties"][space_id][container_id]
    table_data.append([
        props["alarm_id"],
        props["severity"],
        props["value"],
        props["priority"],
        props["timestamp"],
    ])

print(f"Found {len(items)} unacknowledged alarms (showing first 5):")
print(tabulate(
    table_data,
    headers=["Alarm ID", "Severity", "Value", "Priority", "Timestamp"],
    tablefmt="github",
))
Expected output:
Found 10 unacknowledged alarms (showing first 5):
| Alarm ID        | Severity   |   Value |   Priority | Timestamp                |
|-----------------|------------|---------|------------|--------------------------|
| ALARM-TEMP-042  | CRITICAL   |   98.28 |        100 | 2025-10-24T12:04:54.000Z |
| ALARM-TEMP-042  | LOW        |   71.82 |        100 | 2025-10-24T12:44:54.000Z |
| ALARM-LEVEL-055 | LOW        |   69.19 |         99 | 2025-10-24T12:55:54.000Z |
| ALARM-FLOW-013  | LOW        |   56.17 |         97 | 2025-10-24T12:50:54.000Z |
| ALARM-FLOW-013  | HIGH       |  131.5  |         97 | 2025-10-24T11:45:54.000Z |
  • Use the sync endpoint for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently. It returns a paginated result set—you iterate through the data by making subsequent requests with the cursor returned in the previous response. The sync process is complete when the API response has hasNext set to false. The sync endpoint supports a maximum of 1,000 records per page and doesn’t support custom sorting. The read throughput is governed by the stream settings defined by the stream template you chose when creating the stream. This endpoint is designed for workflows like “export all recent alarm events for analysis” or for implementing incremental data pipelines.
from cognite.client import CogniteClient
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Sync records from the last 2 days
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
    json={
        "sources": [
            {
                "source": {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                },
                "properties": ["*"],
            }
        ],
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "initializeCursor": "2d-ago",
        "limit": 10,
    },
)

results = response.json()
items = results.get("items", [])

# Display as table
table_data = []
for item in items[:5]:  # Show first 5
    props = item["properties"][space_id][container_id]
    ack_status = "ACK" if props["is_acknowledged"] else "UNACK"
    table_data.append([
        props["alarm_id"],
        props["severity"],
        ack_status,
        props["value"],
        props["timestamp"],
    ])

print(f"Sync returned {len(items)} records (showing first 5):")
print(tabulate(
    table_data,
    headers=["Alarm ID", "Severity", "Status", "Value", "Timestamp"],
    tablefmt="github",
))

if "nextCursor" in results:
    cursor = results["nextCursor"]
    cursor_preview = f"{cursor[:5]}..." if len(cursor) > 5 else cursor
    print(f"\nNext cursor for incremental sync: {cursor_preview}")
Expected output:
Sync returned 10 records (showing first 5):
| Alarm ID        | Severity   | Status   |   Value | Timestamp                |
|-----------------|------------|----------|---------|--------------------------|
| ALARM-PRESS-001 | CRITICAL   | UNACK    |  125.8  | 2025-10-22T10:00:00.000Z |
| ALARM-TEMP-042  | LOW        | UNACK    |   98.78 | 2025-10-24T11:27:54.000Z |
| ALARM-VIBR-088  | CRITICAL   | ACK      |  122.78 | 2025-10-24T11:28:54.000Z |
| ALARM-VIBR-088  | MEDIUM     | ACK      |  140.56 | 2025-10-24T11:29:54.000Z |
| ALARM-LEVEL-055 | CRITICAL   | ACK      |   50.59 | 2025-10-24T11:30:54.000Z |

Next cursor for incremental sync: 08c0e...
  • Use the aggregate endpoint to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations. It returns aggregated results based on the specified aggregation functions and groupings, optimized for analytical queries that require summarizing large datasets without retrieving individual records. This endpoint is ideal for generating reports like “count of alarms by severity” or “average alarm value and priority statistics.”
from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Calculate overall statistics: count, average value, min/max values, average priority
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "total_count": {"count": {}},
            "avg_value": {"avg": {"property": [space_id, container_id, "value"]}},
            "min_value": {"min": {"property": [space_id, container_id, "value"]}},
            "max_value": {"max": {"property": [space_id, container_id, "value"]}},
            "avg_priority": {"avg": {"property": [space_id, container_id, "priority"]}},
        },
    },
)

results = response.json()
aggs = results["aggregates"]

print("Overall Statistics:")
print(f"  Total Records: {aggs['total_count']['count']}")
print(f"  Average Value: {aggs['avg_value']['avg']:.2f}")
print(f"  Min Value: {aggs['min_value']['min']:.2f}")
print(f"  Max Value: {aggs['max_value']['max']:.2f}")
print(f"  Average Priority: {aggs['avg_priority']['avg']:.1f}")
Expected output:
Overall Statistics:
  Total Records: 102
  Average Value: 95.87
  Min Value: 50.15
  Max Value: 149.39
  Average Priority: 63.0
from cognite.client import CogniteClient
from datetime import datetime, timedelta
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Group by alarm_id and calculate statistics for each alarm
response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "by_alarm": {
                "uniqueValues": {
                    "property": [space_id, container_id, "alarm_id"],
                    "aggregates": {
                        "avg_value": {
                            "avg": {"property": [space_id, container_id, "value"]}
                        },
                        "by_severity": {
                            "uniqueValues": {
                                "property": [space_id, container_id, "severity"]
                            }
                        },
                    },
                }
            }
        },
    },
)

results = response.json()
buckets = results["aggregates"]["by_alarm"]["uniqueValueBuckets"]

print("Per-Alarm Statistics:")
table_data = []
for bucket in buckets:
    alarm = bucket["value"]
    count = bucket["count"]
    avg_value = bucket["aggregates"]["avg_value"]["avg"]

    # Get most common severity from nested uniqueValues buckets
    severity_buckets = bucket["aggregates"]["by_severity"]["uniqueValueBuckets"]
    most_common_severity = (
        max(severity_buckets, key=lambda x: x["count"])["value"]
        if severity_buckets
        else "N/A"
    )

    table_data.append([alarm, count, f"{avg_value:.2f}", most_common_severity])

print(tabulate(
    table_data,
    headers=["Alarm ID", "Occurrences", "Avg Value", "Most Common Severity"],
    tablefmt="github",
))
Expected output:
Per-Alarm Statistics:
| Alarm ID        |   Occurrences |   Avg Value | Most Common Severity   |
|-----------------|---------------|-------------|------------------------|
| ALARM-FLOW-013  |            26 |       91.50 | LOW                    |
| ALARM-TEMP-042  |            24 |       92.15 | CRITICAL               |
| ALARM-PRESS-001 |            19 |      104.62 | CRITICAL               |
| ALARM-VIBR-088  |            17 |      106.93 | LOW                    |
| ALARM-LEVEL-055 |            16 |       86.39 | CRITICAL               |
6

Archive acknowledged alarms (stream-to-stream pattern)

The final step of this tutorial is to implement a common Records pattern: moving data between streams based on lifecycle stages. You’ll archive acknowledged alarms by moving them from the mutable stream to the immutable stream. This stream-to-stream pattern is useful for:
  • Hot/cold storage: keep active data in mutable streams, archive historical data in immutable streams
  • Data lifecycle management: move processed records to long-term storage automatically
  • Performance optimization: keep active streams lean by moving old data to archives
The key to reliable stream-to-stream operations is using the sync endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails or is interrupted.
from cognite.client import CogniteClient

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"
immutable_stream_id = "alarm_data_archive_stream"

# In a production system, you would persist this cursor between runs
stored_cursor = None

# Pagination settings
BATCH_SIZE = 10
total_records_moved = 0
batch_number = 0

print("Starting pagination loop to archive acknowledged alarms...")

# Keep paginating until there are no more acknowledged records
while True:
    batch_number += 1
    print(f"\n--- Batch {batch_number} ---")

    # Sync acknowledged records from MUTABLE stream
    sync_response = client.post(
        url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
        json={
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": ["*"],
                }
            ],
            # Use stored cursor if available, otherwise initialize from 2 days ago
            **(
                {"cursor": stored_cursor}
                if stored_cursor
                else {"initializeCursor": "2d-ago"}
            ),
            "filter": {
                "and": [
                    {
                        "hasData": [
                            {
                                "type": "container",
                                "space": space_id,
                                "externalId": container_id,
                            }
                        ]
                    },
                    {
                        "equals": {
                            "property": [space_id, container_id, "is_acknowledged"],
                            "value": True,
                        }
                    },
                ]
            },
            "limit": BATCH_SIZE,
        },
    )

    sync_results = sync_response.json()
    acknowledged_records = sync_results.get("items", [])
    next_cursor = sync_results["nextCursor"]
    has_next = sync_results["hasNext"]

    print(f"Found {len(acknowledged_records)} acknowledged records in this batch")
    print(f"More data immediately available: {has_next}")

    # If no records found, we're done
    if len(acknowledged_records) == 0:
        print("No more acknowledged records to move")
        stored_cursor = next_cursor  # Update cursor even when no records found
        break

    # Prepare records for ingestion into immutable stream
    print(f"Ingesting {len(acknowledged_records)} records to IMMUTABLE stream...")

    immutable_items = []
    records_to_delete = []

    for record in acknowledged_records:
        # Extract properties and identifiers
        record_space = record["space"]
        record_external_id = record["externalId"]
        props = record["properties"][space_id][container_id]

        # Store record identifier for deletion
        records_to_delete.append({
            "space": record_space,
            "externalId": record_external_id,
        })

        # Prepare for immutable stream
        immutable_items.append({
            "space": record_space,
            "externalId": record_external_id,
            "sources": [
                {
                    "source": {
                        "type": "container",
                        "space": space_id,
                        "externalId": container_id,
                    },
                    "properties": props,
                }
            ],
        })

    # Ingest into immutable stream
    immutable_res = client.post(
        url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records",
        json={"items": immutable_items},
    )

    # Check if ingest was successful (200/201: sync success, 202: async processing)
    if immutable_res.status_code not in [200, 201, 202]:
        print(f"Warning: Immutable ingest returned status {immutable_res.status_code}")
        print(f"Response: {immutable_res.text}")
        break
    else:
        print(f"✓ Ingested {len(immutable_items)} records to IMMUTABLE stream")

    # Delete acknowledged records from MUTABLE stream
    print(f"Deleting {len(records_to_delete)} records from MUTABLE stream...")

    delete_res = client.post(
        url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/delete",
        json={"items": records_to_delete},
    )
    print(f"✓ Deleted {len(records_to_delete)} records from MUTABLE stream")

    # IMPORTANT: Only update cursor after BOTH operations succeed
    stored_cursor = next_cursor
    total_records_moved += len(acknowledged_records)

    cursor_preview = f"{stored_cursor[:20]}..." if len(stored_cursor) > 20 else stored_cursor
    print(f"✓ Cursor updated: {cursor_preview}")
    print(f"Total records moved so far: {total_records_moved}")

print(f"\n{'=' * 70}")
print("PAGINATION COMPLETE")
print(f"{'=' * 70}")
print(f"Total batches processed: {batch_number}")
print(f"Total records moved: {total_records_moved}")
print(f"{'=' * 70}")
Expected output:
Starting pagination loop to archive acknowledged alarms...

--- Batch 1 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0b89cecbeb1dab818...
Total records moved so far: 10

--- Batch 2 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0aae0c3c0b1dab818...
Total records moved so far: 20

--- Batch 3 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0e5b4cdc4b1dab818...
Total records moved so far: 30

[... batches 4-6 similar, each moving 10 records ...]

--- Batch 7 ---
Found 4 acknowledged records in this batch
More data immediately available: False
Ingesting 4 records to IMMUTABLE stream...
✓ Ingested 4 records to IMMUTABLE stream
Deleting 4 records from MUTABLE stream...
✓ Deleted 4 records from MUTABLE stream
✓ Cursor updated: 0880beeae1c9b1dab818...
Total records moved so far: 64

--- Batch 8 ---
Found 0 acknowledged records in this batch
More data immediately available: False
No more acknowledged records to move

======================================================================
PAGINATION COMPLETE
======================================================================
Total batches processed: 8
Total records moved: 64
======================================================================
After archiving, verify that the records were moved correctly by querying both streams to see the final distribution of acknowledged and unacknowledged alarms.
from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"
immutable_stream_id = "alarm_data_archive_stream"

# Query mutable stream for distribution
mutable_response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "total_count": {"count": {}},
            "by_acknowledged": {
                "uniqueValues": {
                    "property": [space_id, container_id, "is_acknowledged"]
                }
            },
        },
    },
)

mutable_results = mutable_response.json()
print("MUTABLE Stream Statistics:")
print(f"  Total Records: {mutable_results['aggregates']['total_count']['count']}")

mutable_buckets = mutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
for bucket in mutable_buckets:
    is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
    ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
    print(f"  {ack_status}: {bucket['count']} records")

# Query immutable stream for distribution
immutable_response = client.post(
    url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records/aggregate",
    json={
        "lastUpdatedTime": {
            "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
        },
        "filter": {
            "hasData": [
                {
                    "type": "container",
                    "space": space_id,
                    "externalId": container_id,
                }
            ]
        },
        "aggregates": {
            "total_count": {"count": {}},
            "by_acknowledged": {
                "uniqueValues": {
                    "property": [space_id, container_id, "is_acknowledged"]
                }
            },
        },
    },
)

immutable_results = immutable_response.json()
print("\nIMMUTABLE Stream Statistics:")
print(f"  Total Records: {immutable_results['aggregates']['total_count']['count']}")

immutable_buckets = immutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
for bucket in immutable_buckets:
    is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
    ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
    print(f"  {ack_status}: {bucket['count']} records")
Expected output:
MUTABLE Stream Statistics:
  Total Records: 38
  Unacknowledged (False): 38 records

IMMUTABLE Stream Statistics:
  Total Records: 64
  Acknowledged (True): 64 records
This confirms that all 64 acknowledged alarms have been successfully moved from the mutable stream (which now contains only the remaining 38 unacknowledged alarms) to the immutable archive stream. Together, this accounts for the original 102 total records (38 + 64 = 102).
You’ve successfully built a complete SCADA alarm management pipeline using Records! You’ve created schemas, set up mutable and immutable streams, ingested alarm data, updated records, queried them using multiple endpoints, and implemented a stream-to-stream archiving pattern. This pipeline demonstrates the full lifecycle of managing high-volume OT event data with Records.

Advanced: Build a stream-to-stream transformation pipeline

A powerful architectural pattern for Records is creating data pipelines that move and transform data between streams. This separates raw data ingestion from creating clean, standardized datasets for consumption. You’ll build a three-stage pipeline: ingest raw data to a source stream, transform it using a Hosted REST Extractor, and consume the standardized data from a destination stream.
The DataStaging stream template is planned for a future release. Until then, you can use the ImmutableTestStream template to test and implement this pattern.
To build a stream-to-stream transformation pipeline, you need to configure these components:
  1. Source: Connects to the CDF API to read from the source stream’s /sync endpoint.
  2. Destination: Provides credentials to write transformed records back to CDF.
  3. Mapping: Transforms records from the source stream format to the destination stream format.
  4. Job: Ties everything together and runs on a schedule with pagination and incremental load handling.
1

Create the source

The source defines how the REST extractor connects to the CDF API to read records from your source stream. Since you’re reading from CDF, use your CDF cluster as the host and configure OAuth client credentials authentication.
from cognite.client.data_classes.hosted_extractors.sources import (
    RestSourceWrite,
    RESTClientCredentialsAuthenticationWrite
)

source = client.hosted_extractors.sources.create(
    RestSourceWrite(
        external_id="my-records-source",
        host="<your-cdf-cluster>.cognitedata.com",
        scheme="https",
        authentication=RESTClientCredentialsAuthenticationWrite(
            client_id="<your-client-id>",
            client_secret="<your-client-secret>",
            token_url="<your-token-url>",
            scopes="<your-scopes>"
        )
    )
)
Replace <your-cdf-cluster>, <your-client-id>, <your-client-secret>, <your-token-url>, and <your-scopes> with your actual values.
The source credentials must have streamrecords:READ capability scoped to the source stream’s space.
2

Create the destination

The destination provides credentials for the extractor to write transformed records to CDF. Create a session and use its nonce to authenticate the destination.
from cognite.client.data_classes.hosted_extractors import DestinationWrite, SessionWrite
from cognite.client.data_classes.iam import ClientCredentials

# Create a session for the destination
session = client.iam.sessions.create(
    client_credentials=ClientCredentials(
        client_id="<your-client-id>",
        client_secret="<your-client-secret>",
    ),
    session_type="CLIENT_CREDENTIALS"
)

# Create the destination using the session nonce
destination = client.hosted_extractors.destinations.create(
    DestinationWrite(
        external_id="my-records-destination",
        credentials=SessionWrite(session.nonce)
    )
)
The destination credentials must have streamrecords:WRITE capability scoped to the destination stream’s space.
3

Create the mapping

The mapping defines how to transform each record from the source stream into the output format for the destination stream. The /sync endpoint returns a response containing an items array. The mapping iterates over these items and transforms each one.For writing to Records streams, the output must include:
  • type: Set to "immutable_record" for immutable streams.
  • space: The space where the destination stream exists.
  • stream: The external ID of the destination stream.
  • externalId: A unique identifier for each record.
  • sources: An array of container sources with their properties.
  • beta: Set to true as records destination is currently in beta.
from cognite.client.data_classes.hosted_extractors.mappings import MappingWrite, CustomMapping

mapping_expression = """
input.items.map(item => {
    "type": "immutable_record",
    "space": "my-space",
    "externalId": item.externalId,
    "stream": "my-destination-stream",
    "beta": true,
    "sources": [
        {
            "source": {
                "space": "my-space",
                "externalId": "my-container",
                "type": "container"
            },
            "properties": item.properties["my-space"]["my-container"]
        }
    ]
})
"""

mapping = client.hosted_extractors.mappings.create(
    MappingWrite(
        external_id="my-records-mapping",
        mapping=CustomMapping(mapping_expression),
        published=True,
        input="json"
    )
)
See custom mappings for the full mapping language reference.
4

Create the job

The job ties everything together. It reads from the source stream’s /sync endpoint on a schedule, applies the mapping, and writes to the destination. Configure incremental load to initialize the cursor on the first run, and pagination to handle cursor-based iteration through records.
from cognite.client.data_classes.hosted_extractors.jobs import (
    JobWrite,
    CustomFormat,
    RestConfig
)
from cognite.client.data_classes.hosted_extractors import BodyLoad

source_stream_id = "my-source-stream"
project = client.config.project

job = client.hosted_extractors.jobs.create(
    JobWrite(
        external_id="my-records-sync-job",
        destination_id="my-records-destination",
        source_id="my-records-source",
        format=CustomFormat(mapping_id="my-records-mapping"),
        config=RestConfig(
            interval="1h",
            path=f"/api/v1/projects/{project}/streams/{source_stream_id}/records/sync",
            method="post",
            headers={"Content-Type": "application/json"},
            body={
                "limit": 1000,
                "filter": {
                    "hasData": [
                        {
                            "type": "container",
                            "space": "my-space",
                            "externalId": "my-container"
                        }
                    ]
                }
            },
            # Initialize cursor on first run
            incremental_load=BodyLoad(
                value='{...context.request.body, ...{"initializeCursor": "30d-ago"}}'
            ),
            # Use cursor from response for subsequent requests
            pagination=BodyLoad(
                value='if(body.nextCursor, {...context.request.body, ...{"cursor": body.nextCursor}}, null)'
            ),
        ),
    )
)
The job configuration includes:
ParameterDescription
intervalHow often the job runs (for example, 1h, 15m, 1d).
pathThe /sync endpoint path for the source stream.
bodyThe base request body with limit and filter parameters.
incremental_loadSets initializeCursor on the first request to start syncing from a specific point (for example, 30d-ago).
paginationUses nextCursor from the response to paginate through all records. Returns null to stop when no more records exist.
The incremental_load expression merges the base body with the initializeCursor parameter. The pagination expression checks if nextCursor exists in the response and either continues with the cursor or returns null to stop pagination.
5

Monitor the job

After creating the job, you can monitor its status, logs, and metrics.
# Check job logs
logs = client.hosted_extractors.jobs.list_logs(job="my-records-sync-job")

# Check job metrics
metrics = client.hosted_extractors.jobs.list_metrics(job="my-records-sync-job")
Expected outcome: The job runs on the configured interval, reads records from the source stream, transforms them using your mapping, and writes them to the destination stream.
This composable architecture lets you ingest data once in its raw format, then create streams containing standardized, use-case-specific record schemas from the source stream. This builds a robust and maintainable data processing ecosystem within CDF for your high-volume OT event and log data.
Last modified on January 27, 2026