> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cognite.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Get started with Records

> Learn Records core concepts and patterns through a hands-on tutorial. Follow along with a SCADA alarm management example that demonstrates schemas, streams, ingestion, querying, and stream-to-stream pipelines.

This hands-on guide teaches you the core patterns for working with Records in Cognite Data Fusion (CDF). You'll learn how to create schemas, set up streams, ingest high-volume event data, query records efficiently, and build data pipelines between streams.

Throughout this guide, **SCADA alarm management** serves as a practical example, but these patterns apply to any high-volume industrial data: multi-value sensor readings, event frames, equipment logs, telemetry events, and more. The Records service is built specifically for ingesting billions of events and querying them efficiently.

<Warning>
  This tutorial teaches Records patterns and APIs; it does not cover capacity planning. Before you create production streams or ingest at scale, read and understand [Choose and size streams](/cdf/dm/records/guides/choose_and_size_streams). Use that guide to confirm records fit your workload, choose stream templates, and plan storage and retention.
</Warning>

Records are designed primarily for **immutable data** like logs, events, and historical records that don't change after ingestion. If your data needs updates during its lifecycle (like active alarms awaiting acknowledgment), you can use a mutable stream as a transitional stage, and then archive the finalized records to an immutable stream. For data that requires frequent updates and will never be archived, consider using [data modeling nodes](/cdf/dm/dm_concepts/dm_spaces_instances#instance) instead.

By the end of this guide, you'll have a working example that handles the full lifecycle: ingesting data, updating records during their active phase, querying efficiently, and archiving completed records to immutable storage.

If you're new to Records concepts like spaces, containers, and streams, see [Records and streams](/cdf/dm/records/concepts/records_and_streams) for background.

## Prerequisites

1. Configure [access control](/cdf/access/guides/capabilities#records) for your user or service account.
2. Development environment with either:
   * Python with `cognite-sdk` installed, or
   * Node.js/TypeScript with `@cognite/sdk` installed

<AccordionGroup>
  <Accordion title="Example: configure capabilities">
    <CodeGroup>
      ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
      from cognite.client import CogniteClient
      from cognite.client.data_classes import GroupWrite
      from cognite.client.data_classes.capabilities import (
          DataModelsAcl,
          StreamRecordsAcl,
          StreamsAcl,
      )

      client = CogniteClient()

      my_group = client.iam.groups.create(
          GroupWrite(
              name="Records Access Group",
              capabilities=[
                  DataModelsAcl(
                      actions=[
                          DataModelsAcl.Action.Read,
                          DataModelsAcl.Action.Write
                      ],
                      scope=DataModelsAcl.Scope.All(),
                  ),
                  StreamRecordsAcl(
                      actions=[
                          StreamRecordsAcl.Action.Read,
                          StreamRecordsAcl.Action.Write
                      ],
                      scope=StreamRecordsAcl.Scope.All(),
                  ),
                  StreamsAcl(
                      actions=[
                          StreamsAcl.Action.Read,
                          StreamsAcl.Action.Create,
                          StreamsAcl.Action.Delete,
                      ],
                      scope=StreamsAcl.Scope.All(),
                  ),
              ],
          )
      )
      ```

      ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
      import { CogniteClient } from '@cognite/sdk';

      const client = new CogniteClient({ appId: 'records-tutorial' });

      const myGroup = await client.groups.create([{
        name: 'Records Access Group',
        capabilities: [
          {
            dataModelsAcl: {
              actions: ['READ', 'WRITE'],
              scope: { all: {} }
            }
          },
          {
            streamRecordsAcl: {
              actions: ['READ', 'WRITE'],
              scope: { all: {} }
            }
          },
          {
            streamsAcl: {
              actions: ['READ', 'CREATE', 'DELETE'],
              scope: { all: {} }
            }
          }
        ]
      }]);
      ```
    </CodeGroup>
  </Accordion>
</AccordionGroup>

## Build your first Records pipeline

In this section, you'll build a complete data pipeline using a SCADA alarm management scenario as an example. This pipeline demonstrates the full Records lifecycle: using a mutable stream as a temporary staging area for active data, ingesting and updating records, querying with different endpoints, and archiving finalized records to an immutable stream for permanent storage.

This pattern mirrors what you'd deploy in production for managing any high-volume event data where records have a defined lifecycle.

<Steps>
  <Step title="Create a space and define the alarm schema">
    Start by defining the schema for your alarm records. You'll create a [space](/cdf/dm/dm_concepts/dm_spaces_instances#space) to organize your data and a [container](/cdf/dm/dm_concepts/dm_containers_views_datamodels#containers) that defines the structure of each alarm event. If you're new to these concepts, check out the [data modeling concepts](/cdf/dm/dm_concepts/dm_containers_views_datamodels) for background.

    1. **Create a space** to serve as a namespace for your SCADA alarm data. This keeps your alarm schemas and records organized and separate from other data in your CDF project.

    <AccordionGroup>
      <Accordion title="Example: create a space">
        <CodeGroup>
          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from cognite.client.data_classes.data_modeling import SpaceApply

          client = CogniteClient()

          space_id = "scada-alarms"
          space = client.data_modeling.spaces.apply(
              SpaceApply(
                  space=space_id,
                  description="SCADA alarm events",
                  name="SCADA Alarms",
              )
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const space = await client.spaces.upsert([
            {
              space: spaceId,
              description: 'SCADA alarm events',
              name: 'SCADA Alarms',
            }
          ]);
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>

    2. **Create and define the container** that specifies what properties each alarm record will have. This schema includes the fields you'd typically see in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.

    <Note>
      When creating containers for records, you must set `usedFor` to `"record"`. Records can only be ingested into containers specifically designated for records.
    </Note>

    <AccordionGroup>
      <Accordion title="Example: create SCADA alarm container">
        <CodeGroup>
          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from cognite.client.data_classes.data_modeling import (
              ContainerApply,
              ContainerProperty,
              Text,
              Float64,
              Int64,
              Boolean,
              Timestamp,
          )

          client = CogniteClient()

          container_id = "alarm_events"
          container = client.data_modeling.containers.apply(
              ContainerApply(
                  space="scada-alarms",
                  external_id=container_id,
                  name="SCADA Alarm Events",
                  description="Container for SCADA alarm event data",
                  used_for="record",
                  properties={
                      # Text property - alarm identifier
                      "alarm_id": ContainerProperty(
                          type=Text(is_list=False),
                          nullable=False,
                          name="Alarm ID",
                          description="Unique identifier for the alarm",
                      ),
                      # Text property - alarm severity
                      "severity": ContainerProperty(
                          type=Text(is_list=False),
                          nullable=False,
                          name="Severity",
                          description="Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)",
                      ),
                      # Float property - alarm value
                      "value": ContainerProperty(
                          type=Float64(is_list=False),
                          nullable=False,
                          name="Value",
                          description="Numeric value associated with the alarm condition",
                      ),
                      # Int property - priority score
                      "priority": ContainerProperty(
                          type=Int64(is_list=False),
                          nullable=False,
                          name="Priority",
                          description="Priority score (1-100)",
                      ),
                      # Boolean property - acknowledged status
                      "is_acknowledged": ContainerProperty(
                          type=Boolean(is_list=False),
                          nullable=False,
                          name="Acknowledged Status",
                          description="True if alarm has been acknowledged",
                      ),
                      # Timestamp property - when alarm occurred
                      "timestamp": ContainerProperty(
                          type=Timestamp(is_list=False),
                          nullable=False,
                          name="Timestamp",
                          description="Time when the alarm occurred",
                      ),
                  },
              )
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const containerId = 'alarm_events';
          const container = await client.containers.upsert([
            {
              space: 'scada-alarms',
              externalId: containerId,
              name: 'SCADA Alarm Events',
              description: 'Container for SCADA alarm event data',
              usedFor: 'record',
              properties: {
                alarm_id: {
                  type: { type: 'text' },
                  nullable: false,
                  name: 'Alarm ID',
                  description: 'Unique identifier for the alarm',
                },
                severity: {
                  type: { type: 'text' },
                  nullable: false,
                  name: 'Severity',
                  description: 'Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)',
                },
                value: {
                  type: { type: 'float64' },
                  nullable: false,
                  name: 'Value',
                  description: 'Numeric value associated with the alarm condition',
                },
                priority: {
                  type: { type: 'int64' },
                  nullable: false,
                  name: 'Priority',
                  description: 'Priority score (1-100)',
                },
                is_acknowledged: {
                  type: { type: 'boolean' },
                  nullable: false,
                  name: 'Acknowledged Status',
                  description: 'True if alarm has been acknowledged',
                },
                timestamp: {
                  type: { type: 'timestamp' },
                  nullable: false,
                  name: 'Timestamp',
                  description: 'Time when the alarm occurred',
                },
              },
            }
          ]);
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Create streams for different lifecycles">
    Now create the streams that will hold your alarm records. Streams need a [stream settings template](/cdf/dm/records/concepts/records_and_streams#stream-templates) that defines their behavior, performance characteristics, and lifecycle policies. For this pipeline, you'll create two streams to handle different stages of the alarm lifecycle.

    Choose your stream template based on **data lifecycle and throughput requirements**, not by record type or source system. For detailed specifications of each template's limits, see the [Streams API documentation](/api-reference/concepts/20230101/streams).

    * **Create a mutable stream** for active alarms that need real-time updates, like when an operator acknowledges an alarm. This stream uses the `BasicLiveData` template, which is optimized for data that changes frequently.

    <AccordionGroup>
      <Accordion title="Example: create a mutable stream">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "items": [
                {
                  "externalId": "alarm_data_live_stream",
                  "settings": {"template": {"name": "BasicLiveData"}}
                }
              ]
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient

          client = CogniteClient()

          mutable_stream_id = "alarm_data_live_stream"
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams",
              json={
                  "items": [
                      {
                          "externalId": mutable_stream_id,
                          "settings": {"template": {"name": "BasicLiveData"}},
                      }
                  ]
              }
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const mutableStreamId = 'alarm_data_live_stream';
          const response = await client.streams.create({
            externalId: mutableStreamId,
            settings: {
              template: {
                name: 'BasicLiveData',
              },
            },
          });
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>

    * **Create an immutable stream** for archived alarms that won't change after they're written. This stream uses the `BasicArchive` template and is designed for write-once, read-many scenarios and long-term retention.

    <AccordionGroup>
      <Accordion title="Example: create an immutable stream">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "items": [
                {
                  "externalId": "alarm_data_archive_stream",
                  "settings": {"template": {"name": "BasicArchive"}}
                }
              ]
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient

          client = CogniteClient()

          immutable_stream_id = "alarm_data_archive_stream"
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams",
              json={
                  "items": [
                      {
                          "externalId": immutable_stream_id,
                          "settings": {"template": {"name": "BasicArchive"}},
                      }
                  ]
              }
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const immutableStreamId = 'alarm_data_archive_stream';
          const response = await client.streams.create({
            externalId: immutableStreamId,
            settings: {
              template: {
                name: 'BasicArchive',
              },
            },
          });
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Ingest records">
    Now you'll ingest alarm records into your mutable stream. In production, you'd typically use [a Cognite extractor](/cdf/integration/concepts/extraction/index#cognite-extractors) that reads directly from your source system and writes to Records. For this tutorial, you'll use the API directly to simulate the ingestion process.

    <Warning>
      The Records service does **not** support onboarding data using RAW and Transformations. Always ingest data directly to a Records stream using one of the supported extractors:

      * [OPC UA Extractor](/cdf/integration/guides/extraction/opc_ua) (Immutable stream support)
      * [PI AF Extractor](/cdf/integration/guides/extraction/pi_af)
      * [Hosted REST Extractor](/cdf/integration/guides/extraction/rest)
    </Warning>

    When writing records to the API, you need to specify the target stream identifier and include the container's `space`.`externalId` to identify which schema the data follows.

    1. **Start by ingesting a couple of hardcoded alarm records** to verify your setup works. This gives you a small dataset to test queries and updates before adding more data.

    <AccordionGroup>
      <Accordion title="Example: ingest initial alarm records">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/${STREAM_ID}/records" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "items": [
                {
                  "space": "scada-alarms",
                  "externalId": "record-001",
                  "sources": [{
                    "source": {
                      "type": "container",
                      "space": "scada-alarms",
                      "externalId": "alarm_events"
                    },
                    "properties": {
                      "alarm_id": "ALARM-PRESS-001",
                      "severity": "CRITICAL",
                      "value": 125.8,
                      "priority": 95,
                      "is_acknowledged": false,
                      "timestamp": "2025-10-22T10:00:00.000Z"
                    }
                  }]
                },
                {
                  "space": "scada-alarms",
                  "externalId": "record-002",
                  "sources": [{
                    "source": {
                      "type": "container",
                      "space": "scada-alarms",
                      "externalId": "alarm_events"
                    },
                    "properties": {
                      "alarm_id": "ALARM-TEMP-042",
                      "severity": "MEDIUM",
                      "value": 88.3,
                      "priority": 60,
                      "is_acknowledged": false,
                      "timestamp": "2025-10-22T10:00:00.000Z"
                    }
                  }]
                }
              ]
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          import uuid

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Create two hardcoded alarm records
          hardcoded_records = [
              {
                  "alarm_id": "ALARM-PRESS-001",
                  "severity": "CRITICAL",
                  "value": 125.8,
                  "priority": 95,
                  "is_acknowledged": False,
                  "timestamp": "2025-10-22T10:00:00.000Z",
              },
              {
                  "alarm_id": "ALARM-TEMP-042",
                  "severity": "MEDIUM",
                  "value": 88.3,
                  "priority": 60,
                  "is_acknowledged": False,
                  "timestamp": "2025-10-22T10:00:00.000Z",
              },
          ]

          # Prepare records for ingestion
          items = []
          for record in hardcoded_records:
              ext_id = str(uuid.uuid4())
              items.append(
                  {
                      "space": space_id,
                      "externalId": ext_id,
                      "sources": [
                          {
                              "source": {
                                  "type": "container",
                                  "space": space_id,
                                  "externalId": container_id,
                              },
                              "properties": record,
                          }
                      ],
                  }
              )

          # Ingest hardcoded records
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
              json={"items": items},
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Create two hardcoded alarm records
          const hardcodedRecords = [
            {
              alarm_id: 'ALARM-PRESS-001',
              severity: 'CRITICAL',
              value: 125.8,
              priority: 95,
              is_acknowledged: false,
              timestamp: '2025-10-22T10:00:00.000Z',
            },
            {
              alarm_id: 'ALARM-TEMP-042',
              severity: 'MEDIUM',
              value: 88.3,
              priority: 60,
              is_acknowledged: false,
              timestamp: '2025-10-22T10:00:00.000Z',
            },
          ];

          // Prepare records for ingestion
          const items = hardcodedRecords.map((record) => ({
            space: spaceId,
            externalId: crypto.randomUUID(),
            sources: [
              {
                source: {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
                properties: record,
              },
            ],
          }));

          // Ingest hardcoded records
          await client.records.ingest(mutableStreamId, items);
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>

    2. **Add 100 random alarm records** to simulate a realistic dataset. This larger dataset will make your queries and aggregations more meaningful and help you see how Records handles volume.

    <AccordionGroup>
      <Accordion title="Example: ingest random alarm records for larger dataset">
        <CodeGroup>
          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from datetime import datetime, timedelta
          import random
          import uuid

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Generate 100 random alarm records
          alarm_ids = [
              "ALARM-PRESS-001",
              "ALARM-TEMP-042",
              "ALARM-FLOW-013",
              "ALARM-VIBR-088",
              "ALARM-LEVEL-055",
          ]
          severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
          base_time = datetime.now() - timedelta(hours=2)

          random_records = [
              {
                  "alarm_id": random.choice(alarm_ids),
                  "severity": random.choice(severities),
                  "value": round(random.uniform(50.0, 150.0), 2),
                  "priority": random.randint(20, 100),
                  "is_acknowledged": random.choice([True, False]),
                  "timestamp": (base_time + timedelta(minutes=i)).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
              }
              for i in range(100)
          ]

          # Prepare random records for ingestion
          items = []
          for record in random_records:
              items.append(
                  {
                      "space": space_id,
                      "externalId": str(uuid.uuid4()),
                      "sources": [
                          {
                              "source": {
                                  "type": "container",
                                  "space": space_id,
                                  "externalId": container_id,
                              },
                              "properties": record,
                          }
                      ],
                  }
              )

          # Ingest random records
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
              json={"items": items},
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Generate 100 random alarm records
          const alarmIds = [
            'ALARM-PRESS-001',
            'ALARM-TEMP-042',
            'ALARM-FLOW-013',
            'ALARM-VIBR-088',
            'ALARM-LEVEL-055',
          ];
          const severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'];
          const baseTime = new Date(Date.now() - 2 * 60 * 60 * 1000); // 2 hours ago

          const randomRecords = Array.from({ length: 100 }, (_, i) => {
            const timestamp = new Date(baseTime.getTime() + i * 60 * 1000);
            return {
              alarm_id: alarmIds[Math.floor(Math.random() * alarmIds.length)],
              severity: severities[Math.floor(Math.random() * severities.length)],
              value: Math.round((50 + Math.random() * 100) * 100) / 100,
              priority: Math.floor(20 + Math.random() * 81),
              is_acknowledged: Math.random() > 0.5,
              timestamp: timestamp.toISOString(),
            };
          });

          // Prepare random records for ingestion
          const items = randomRecords.map((record) => ({
            space: spaceId,
            externalId: crypto.randomUUID(),
            sources: [
              {
                source: {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
                properties: record,
              },
            ],
          }));

          // Ingest random records
          await client.records.ingest(mutableStreamId, items);
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Update records (mutable streams only)">
    One advantage of mutable streams is the ability to update records after they're written. This is essential for scenarios like acknowledging alarms when an operator responds, updating status fields as conditions change, or correcting data errors. Use the `upsert` endpoint to update existing records by their `space` and `externalId`.

    <Warning>
      Records do **not** support partial updates. When you update a record, you must provide **all** properties in the container, not just the ones you want to change. All non-nullable properties must be included in every upsert request. Omitting one causes a validation error. See [Updating records in mutable streams](/cdf/dm/records/concepts/records_and_streams#updating-records-in-mutable-streams) for more information.
    </Warning>

    The recommended workflow for updating a record:

    1. **Read** the existing record using the `filter` or `sync` endpoint.
    2. **Merge** your changes into the full property set.
    3. **Upsert** the complete record back to the stream.

    To update a record, you need its `externalId`. In practice, you'd typically get this from a previous query or store it when you first ingest the record. For this example, you'll update one of the alarm records you ingested earlier to mark it as acknowledged.

    <AccordionGroup>
      <Accordion title="Example: update an alarm to acknowledged status">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/${STREAM_ID}/records/upsert" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "items": [
                {
                  "space": "scada-alarms",
                  "externalId": "your-record-external-id",
                  "sources": [{
                    "source": {
                      "type": "container",
                      "space": "scada-alarms",
                      "externalId": "alarm_events"
                    },
                    "properties": {
                      "alarm_id": "ALARM-TEMP-042",
                      "severity": "MEDIUM",
                      "value": 88.3,
                      "priority": 60,
                      "is_acknowledged": true,
                      "timestamp": "2025-10-22T10:00:00.000Z"
                    }
                  }]
                }
              ]
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Assuming you have the external_id of a record you want to update
          # In practice, you would get this from a previous query or store it during ingestion
          record_external_id = "your-record-external-id"  # Replace with actual ID

          # Update the record to set is_acknowledged to True
          updated_record = {
              "alarm_id": "ALARM-TEMP-042",
              "severity": "MEDIUM",
              "value": 88.3,
              "priority": 60,
              "is_acknowledged": True,
              "timestamp": "2025-10-22T10:00:00.000Z",
          }

          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/upsert",
              json={
                  "items": [
                      {
                          "space": space_id,
                          "externalId": record_external_id,
                          "sources": [
                              {
                                  "source": {
                                      "type": "container",
                                      "space": space_id,
                                      "externalId": container_id,
                                  },
                                  "properties": updated_record,
                              }
                          ],
                      }
                  ]
              },
          )
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Assuming you have the external_id of a record you want to update
          // In practice, you would get this from a previous query or store it during ingestion
          const recordExternalId = 'your-record-external-id'; // Replace with actual ID

          // Update the record to set is_acknowledged to true
          const updatedRecord = {
            alarm_id: 'ALARM-TEMP-042',
            severity: 'MEDIUM',
            value: 88.3,
            priority: 60,
            is_acknowledged: true,
            timestamp: '2025-10-22T10:00:00.000Z',
          };

          await client.records.upsert(mutableStreamId, [
            {
              space: spaceId,
              externalId: recordExternalId,
              sources: [
                {
                  source: {
                    type: 'container' as const,
                    space: spaceId,
                    externalId: containerId,
                  },
                  properties: updatedRecord,
                },
              ],
            },
          ]);
          ```
        </CodeGroup>
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Query records">
    Next, you'll query your alarm records using the Records API. The API provides three endpoints for consuming data, each optimized for different use cases:

    * For **interactive queries**, use the `filter` endpoint.
    * For **large datasets with pagination**, use the `sync` endpoint.
    * For **statistical analysis**, use the `aggregate` endpoint.

    The query syntax for filtering is similar across all endpoints.

    <Tip>
      When querying records, we recommend that you use the [`hasData` filter](/cdf/dm/dm_concepts/dm_querying#hasdata-filter) to ensure you only retrieve records that contain data in the container you're querying. A record can exist in a stream but may not have data for every container. The `hasData` filter ensures efficient queries by excluding records that don't have data in the selected container.
    </Tip>

    <Info>
      **Time-based filtering with lastUpdatedTime**

      The `lastUpdatedTime` filter is **mandatory** for `filter` and `aggregate` queries on immutable streams. This filter defines the time range for retrieving records:

      * **`gt` (greater than)**: The start of the time range (required for immutable streams)
      * **`lt` (less than)**: The end of the time range (optional, defaults to current time if not specified)

      The maximum time range you can query **in a single request** is defined by the stream's `maxFilteringInterval` setting, which is determined by the [stream template](/cdf/dm/records/concepts/records_and_streams#stream-templates) you chose when the stream was created. This limit applies to the span between `gt` and `lt`, not to how far back in time you can reach, as you can query any historical period as long as each request stays within the interval.

      To query data spanning more than this interval, split your requests into adjacent time windows. See [Query time range limits](/cdf/dm/records/concepts/records_and_streams#query-time-range-limits) for details and examples.

      For mutable streams, `lastUpdatedTime` is optional, but using it improves query performance.
    </Info>

    * **Use the `filter` endpoint** for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set. It returns an unpaginated result set with a maximum of 1,000 records per request and supports custom sorting on any property.

      This endpoint is ideal for queries like "show me the top 10 unacknowledged alarms sorted by priority."

    <AccordionGroup>
      <Accordion title="Example: filter unacknowledged alarms by priority">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/${STREAM_ID}/records/filter" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "lastUpdatedTime": {
                "gt": "2025-10-23T00:00:00.000Z"
              },
              "sources": [{
                "source": {
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                },
                "properties": ["*"]
              }],
              "filter": {
                "and": [
                  {
                    "hasData": [{
                      "type": "container",
                      "space": "scada-alarms",
                      "externalId": "alarm_events"
                    }]
                  },
                  {
                    "equals": {
                      "property": ["scada-alarms", "alarm_events", "is_acknowledged"],
                      "value": false
                    }
                  }
                ]
              },
              "sort": [{
                "property": ["scada-alarms", "alarm_events", "priority"],
                "direction": "descending"
              }],
              "limit": 10
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from datetime import datetime, timedelta
          from tabulate import tabulate

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Filter for unacknowledged alarms, sorted by priority descending
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/filter",
              json={
                  "lastUpdatedTime": {
                      "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
                  },
                  "sources": [
                      {
                          "source": {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          },
                          "properties": ["*"],
                      }
                  ],
                  "filter": {
                      "and": [
                          {
                              "hasData": [
                                  {
                                      "type": "container",
                                      "space": space_id,
                                      "externalId": container_id,
                                  }
                              ]
                          },
                          {
                              "equals": {
                                  "property": [space_id, container_id, "is_acknowledged"],
                                  "value": False,
                              }
                          },
                      ]
                  },
                  "sort": [
                      {
                          "property": [space_id, container_id, "priority"],
                          "direction": "descending",
                      }
                  ],
                  "limit": 10,
              },
          )

          results = response.json()
          items = results.get("items", [])

          # Display as table
          table_data = []
          for item in items[:5]:  # Show first 5
              props = item["properties"][space_id][container_id]
              table_data.append([
                  props["alarm_id"],
                  props["severity"],
                  props["value"],
                  props["priority"],
                  props["timestamp"],
              ])

          print(f"Found {len(items)} unacknowledged alarms (showing first 5):")
          print(tabulate(
              table_data,
              headers=["Alarm ID", "Severity", "Value", "Priority", "Timestamp"],
              tablefmt="github",
          ))
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Filter for unacknowledged alarms, sorted by priority descending
          const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();

          const filterResults = await client.records.filter(mutableStreamId, {
            lastUpdatedTime: {
              gt: oneDayAgo,
            },
            sources: [
              {
                source: {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
                properties: ['*'],
              },
            ],
            filter: {
              and: [
                {
                  hasData: [
                    {
                      type: 'container' as const,
                      space: spaceId,
                      externalId: containerId,
                    },
                  ],
                },
                {
                  equals: {
                    property: [spaceId, containerId, 'is_acknowledged'],
                    value: false,
                  },
                },
              ],
            },
            sort: [
              {
                property: [spaceId, containerId, 'priority'],
                direction: 'descending',
              },
            ],
            limit: 10,
          });

          // Display first 5 results
          console.log(`Found ${filterResults.length} unacknowledged alarms (showing first 5):`);
          filterResults.slice(0, 5).forEach((item) => {
            const props = item.properties[spaceId][containerId];
            console.log(`${props.alarm_id} | ${props.severity} | ${props.value} | ${props.priority} | ${props.timestamp}`);
          });
          ```
        </CodeGroup>

        **Expected output:**

        ```
        Found 10 unacknowledged alarms (showing first 5):
        | Alarm ID        | Severity   |   Value |   Priority | Timestamp                |
        |-----------------|------------|---------|------------|--------------------------|
        | ALARM-TEMP-042  | CRITICAL   |   98.28 |        100 | 2025-10-24T12:04:54.000Z |
        | ALARM-TEMP-042  | LOW        |   71.82 |        100 | 2025-10-24T12:44:54.000Z |
        | ALARM-LEVEL-055 | LOW        |   69.19 |         99 | 2025-10-24T12:55:54.000Z |
        | ALARM-FLOW-013  | LOW        |   56.17 |         97 | 2025-10-24T12:50:54.000Z |
        | ALARM-FLOW-013  | HIGH       |  131.5  |         97 | 2025-10-24T11:45:54.000Z |
        ```
      </Accordion>
    </AccordionGroup>

    * **Use the `sync` endpoint** for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently. It returns a paginated result set—you iterate through the data by making subsequent requests with the cursor returned in the previous response. The sync process is complete when the API response has `hasNext` set to `false`.

      The sync endpoint supports a maximum of 1,000 records per page and doesn't support custom sorting. The read throughput is governed by the stream settings defined by the [stream template](/cdf/dm/records/concepts/records_and_streams#stream-templates) you chose when creating the stream.

      This endpoint is designed for workflows like "export all recent alarm events for analysis" or for implementing incremental data pipelines.

    <AccordionGroup>
      <Accordion title="Example: sync recent alarm records">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/${STREAM_ID}/records/sync" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "sources": [{
                "source": {
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                },
                "properties": ["*"]
              }],
              "filter": {
                "hasData": [{
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                }]
              },
              "initializeCursor": "2m-ago",
              "limit": 10
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from tabulate import tabulate

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Sync records from the last 2 minutes
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
              json={
                  "sources": [
                      {
                          "source": {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          },
                          "properties": ["*"],
                      }
                  ],
                  "filter": {
                      "hasData": [
                          {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          }
                      ]
                  },
                  "initializeCursor": "2m-ago",
                  "limit": 10,
              },
          )

          results = response.json()
          items = results.get("items", [])

          # Display as table
          table_data = []
          for item in items[:5]:  # Show first 5
              props = item["properties"][space_id][container_id]
              ack_status = "ACK" if props["is_acknowledged"] else "UNACK"
              table_data.append([
                  props["alarm_id"],
                  props["severity"],
                  ack_status,
                  props["value"],
                  props["timestamp"],
              ])

          print(f"Sync returned {len(items)} records (showing first 5):")
          print(tabulate(
              table_data,
              headers=["Alarm ID", "Severity", "Status", "Value", "Timestamp"],
              tablefmt="github",
          ))

          if "nextCursor" in results:
              cursor = results["nextCursor"]
              cursor_preview = f"{cursor[:5]}..." if len(cursor) > 5 else cursor
              print(f"\nNext cursor for incremental sync: {cursor_preview}")
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Sync records from the last 2 minutes
          const syncResponse = await client.records
            .sync(mutableStreamId, {
              sources: [
                {
                  source: {
                    type: 'container' as const,
                    space: spaceId,
                    externalId: containerId,
                  },
                  properties: ['*'],
                },
              ],
              filter: {
                hasData: [
                  {
                    type: 'container' as const,
                    space: spaceId,
                    externalId: containerId,
                  },
                ],
              },
              initializeCursor: '2m-ago',
              limit: 10,
            });

          const syncItems = syncResponse.items;

          // Display first 5 results
          console.log(`Sync returned ${syncItems.length} records (showing first 5):`);
          syncItems.slice(0, 5).forEach((item) => {
            const props = item.properties[spaceId][containerId];
            const ackStatus = props.is_acknowledged ? 'ACK' : 'UNACK';
            console.log(`${props.alarm_id} | ${props.severity} | ${ackStatus} | ${props.value} | ${props.timestamp}`);
          });

          if (syncResponse.nextCursor) {
            const cursorPreview = syncResponse.nextCursor.length > 5
              ? `${syncResponse.nextCursor.substring(0, 5)}...`
              : syncResponse.nextCursor;
            console.log(`\nNext cursor for incremental sync: ${cursorPreview}`);
          }
          ```
        </CodeGroup>

        **Expected output:**

        ```
        Sync returned 10 records (showing first 5):
        | Alarm ID        | Severity   | Status   |   Value | Timestamp                |
        |-----------------|------------|----------|---------|--------------------------|
        | ALARM-PRESS-001 | CRITICAL   | UNACK    |  125.8  | 2025-10-22T10:00:00.000Z |
        | ALARM-TEMP-042  | LOW        | UNACK    |   98.78 | 2025-10-24T11:27:54.000Z |
        | ALARM-VIBR-088  | CRITICAL   | ACK      |  122.78 | 2025-10-24T11:28:54.000Z |
        | ALARM-VIBR-088  | MEDIUM     | ACK      |  140.56 | 2025-10-24T11:29:54.000Z |
        | ALARM-LEVEL-055 | CRITICAL   | ACK      |   50.59 | 2025-10-24T11:30:54.000Z |

        Next cursor for incremental sync: 08c0e...
        ```
      </Accordion>
    </AccordionGroup>

    * **Use the `aggregate` endpoint** to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations. It returns aggregated results based on the specified aggregation functions and groupings, optimized for analytical queries that require summarizing large datasets without retrieving individual records.

      This endpoint is ideal for generating reports like "count of alarms by severity" or "average alarm value and priority statistics."

    <AccordionGroup>
      <Accordion title="Example: overall alarm statistics">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/${STREAM_ID}/records/aggregate" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "lastUpdatedTime": { "gt": "2025-10-23T00:00:00.000Z" },
              "filter": {
                "hasData": [{
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                }]
              },
              "aggregates": {
                "total_count": { "count": {} },
                "avg_value": { "avg": { "property": ["scada-alarms", "alarm_events", "value"] } },
                "min_value": { "min": { "property": ["scada-alarms", "alarm_events", "value"] } },
                "max_value": { "max": { "property": ["scada-alarms", "alarm_events", "value"] } },
                "avg_priority": { "avg": { "property": ["scada-alarms", "alarm_events", "priority"] } }
              }
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from datetime import datetime, timedelta

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Calculate overall statistics: count, average value, min/max values, average priority
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
              json={
                  "lastUpdatedTime": {
                      "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
                  },
                  "filter": {
                      "hasData": [
                          {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          }
                      ]
                  },
                  "aggregates": {
                      "total_count": {"count": {}},
                      "avg_value": {"avg": {"property": [space_id, container_id, "value"]}},
                      "min_value": {"min": {"property": [space_id, container_id, "value"]}},
                      "max_value": {"max": {"property": [space_id, container_id, "value"]}},
                      "avg_priority": {"avg": {"property": [space_id, container_id, "priority"]}},
                  },
              },
          )

          results = response.json()
          aggs = results["aggregates"]

          print("Overall Statistics:")
          print(f"  Total Records: {aggs['total_count']['count']}")
          print(f"  Average Value: {aggs['avg_value']['avg']:.2f}")
          print(f"  Min Value: {aggs['min_value']['min']:.2f}")
          print(f"  Max Value: {aggs['max_value']['max']:.2f}")
          print(f"  Average Priority: {aggs['avg_priority']['avg']:.1f}")
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Calculate overall statistics
          const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();

          const aggResults = await client.records.aggregate(mutableStreamId, {
            lastUpdatedTime: {
              gt: oneDayAgo,
            },
            filter: {
              hasData: [
                {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
              ],
            },
            aggregates: {
              total_count: { count: {} },
              avg_value: { avg: { property: [spaceId, containerId, 'value'] } },
              min_value: { min: { property: [spaceId, containerId, 'value'] } },
              max_value: { max: { property: [spaceId, containerId, 'value'] } },
              avg_priority: { avg: { property: [spaceId, containerId, 'priority'] } },
            },
          });

          console.log('Overall Statistics:');
          console.log(`  Total Records: ${aggResults.total_count.count}`);
          console.log(`  Average Value: ${aggResults.avg_value.avg.toFixed(2)}`);
          console.log(`  Min Value: ${aggResults.min_value.min.toFixed(2)}`);
          console.log(`  Max Value: ${aggResults.max_value.max.toFixed(2)}`);
          console.log(`  Average Priority: ${aggResults.avg_priority.avg.toFixed(1)}`);
          ```
        </CodeGroup>

        **Expected output:**

        ```
        Overall Statistics:
          Total Records: 102
          Average Value: 95.87
          Min Value: 50.15
          Max Value: 149.39
          Average Priority: 63.0
        ```
      </Accordion>

      <Accordion title="Example: per-alarm statistics with nested aggregations">
        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/${STREAM_ID}/records/aggregate" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "lastUpdatedTime": { "gt": "2025-10-23T00:00:00.000Z" },
              "filter": {
                "hasData": [{
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                }]
              },
              "aggregates": {
                "by_alarm": {
                  "uniqueValues": {
                    "property": ["scada-alarms", "alarm_events", "alarm_id"],
                    "aggregates": {
                      "avg_value": {
                        "avg": { "property": ["scada-alarms", "alarm_events", "value"] }
                      },
                      "by_severity": {
                        "uniqueValues": {
                          "property": ["scada-alarms", "alarm_events", "severity"]
                        }
                      }
                    }
                  }
                }
              }
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from datetime import datetime, timedelta
          from tabulate import tabulate

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"

          # Group by alarm_id and calculate statistics for each alarm
          response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
              json={
                  "lastUpdatedTime": {
                      "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
                  },
                  "filter": {
                      "hasData": [
                          {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          }
                      ]
                  },
                  "aggregates": {
                      "by_alarm": {
                          "uniqueValues": {
                              "property": [space_id, container_id, "alarm_id"],
                              "aggregates": {
                                  "avg_value": {
                                      "avg": {"property": [space_id, container_id, "value"]}
                                  },
                                  "by_severity": {
                                      "uniqueValues": {
                                          "property": [space_id, container_id, "severity"]
                                      }
                                  },
                              },
                          }
                      }
                  },
              },
          )

          results = response.json()
          buckets = results["aggregates"]["by_alarm"]["uniqueValueBuckets"]

          print("Per-Alarm Statistics:")
          table_data = []
          for bucket in buckets:
              alarm = bucket["value"]
              count = bucket["count"]
              avg_value = bucket["aggregates"]["avg_value"]["avg"]

              # Get most common severity from nested uniqueValues buckets
              severity_buckets = bucket["aggregates"]["by_severity"]["uniqueValueBuckets"]
              most_common_severity = (
                  max(severity_buckets, key=lambda x: x["count"])["value"]
                  if severity_buckets
                  else "N/A"
              )

              table_data.append([alarm, count, f"{avg_value:.2f}", most_common_severity])

          print(tabulate(
              table_data,
              headers=["Alarm ID", "Occurrences", "Avg Value", "Most Common Severity"],
              tablefmt="github",
          ))
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';

          // Group by alarm_id and calculate statistics for each alarm
          const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();

          const groupbyResults = await client.records.aggregate(mutableStreamId, {
            lastUpdatedTime: {
              gt: oneDayAgo,
            },
            filter: {
              hasData: [
                {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
              ],
            },
            aggregates: {
              by_alarm: {
                uniqueValues: {
                  property: [spaceId, containerId, 'alarm_id'],
                  aggregates: {
                    avg_value: {
                      avg: { property: [spaceId, containerId, 'value'] },
                    },
                    by_severity: {
                      uniqueValues: {
                        property: [spaceId, containerId, 'severity'],
                      },
                    },
                  },
                },
              },
            },
          });

          console.log('Per-Alarm Statistics:');
          const buckets = groupbyResults.by_alarm.uniqueValueBuckets;
          buckets.forEach((bucket) => {
            const alarm = bucket.value;
            const count = bucket.count;
            const avgValue = bucket.aggregates.avg_value.avg;

            // Get most common severity from nested uniqueValues buckets
            const severityBuckets = bucket.aggregates.by_severity.uniqueValueBuckets;
            const mostCommonSeverity = severityBuckets.length > 0
              ? severityBuckets.reduce((max, b) => (b.count > max.count ? b : max)).value
              : 'N/A';

            console.log(`${alarm} | ${count} | ${avgValue.toFixed(2)} | ${mostCommonSeverity}`);
          });
          ```
        </CodeGroup>

        **Expected output:**

        ```
        Per-Alarm Statistics:
        | Alarm ID        |   Occurrences |   Avg Value | Most Common Severity   |
        |-----------------|---------------|-------------|------------------------|
        | ALARM-FLOW-013  |            26 |       91.50 | LOW                    |
        | ALARM-TEMP-042  |            24 |       92.15 | CRITICAL               |
        | ALARM-PRESS-001 |            19 |      104.62 | CRITICAL               |
        | ALARM-VIBR-088  |            17 |      106.93 | LOW                    |
        | ALARM-LEVEL-055 |            16 |       86.39 | CRITICAL               |
        ```
      </Accordion>
    </AccordionGroup>
  </Step>

  <Step title="Archive acknowledged alarms (stream-to-stream pattern)">
    The final step of this tutorial is to implement a common Records pattern: moving data between streams based on lifecycle stages. You'll archive acknowledged alarms by moving them from the mutable stream to the immutable stream. This stream-to-stream pattern is useful for:

    * **Hot/cold storage**: keep active data in mutable streams, archive historical data in immutable streams
    * **Data lifecycle management**: move processed records to long-term storage automatically
    * **Performance optimization**: keep active streams lean by moving old data to archives

    The key to reliable stream-to-stream operations is using the `sync` endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails or is interrupted.

    <AccordionGroup>
      <Accordion title="Example: move acknowledged alarms with pagination">
        <CodeGroup>
          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"
          immutable_stream_id = "alarm_data_archive_stream"

          # In a production system, you would persist this cursor between runs
          stored_cursor = None

          # Pagination settings
          BATCH_SIZE = 10
          total_records_moved = 0
          batch_number = 0

          print("Starting pagination loop to archive acknowledged alarms...")

          # Keep paginating until there are no more acknowledged records
          while True:
              batch_number += 1
              print(f"\n--- Batch {batch_number} ---")

              # Sync acknowledged records from MUTABLE stream
              sync_response = client.post(
                  url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
                  json={
                      "sources": [
                          {
                              "source": {
                                  "type": "container",
                                  "space": space_id,
                                  "externalId": container_id,
                              },
                              "properties": ["*"],
                          }
                      ],
                      # Use stored cursor if available, otherwise initialize from 2 minutes ago
                      **(
                          {"cursor": stored_cursor}
                          if stored_cursor
                          else {"initializeCursor": "2m-ago"}
                      ),
                      "filter": {
                          "and": [
                              {
                                  "hasData": [
                                      {
                                          "type": "container",
                                          "space": space_id,
                                          "externalId": container_id,
                                      }
                                  ]
                              },
                              {
                                  "equals": {
                                      "property": [space_id, container_id, "is_acknowledged"],
                                      "value": True,
                                  }
                              },
                          ]
                      },
                      "limit": BATCH_SIZE,
                  },
              )

              sync_results = sync_response.json()
              acknowledged_records = sync_results.get("items", [])
              next_cursor = sync_results["nextCursor"]
              has_next = sync_results["hasNext"]

              print(f"Found {len(acknowledged_records)} acknowledged records in this batch")
              print(f"More data immediately available: {has_next}")

              # If no records found, we're done
              if len(acknowledged_records) == 0:
                  print("No more acknowledged records to move")
                  stored_cursor = next_cursor  # Update cursor even when no records found
                  break

              # Prepare records for ingestion into immutable stream
              print(f"Ingesting {len(acknowledged_records)} records to IMMUTABLE stream...")

              immutable_items = []
              records_to_delete = []

              for record in acknowledged_records:
                  # Extract properties and identifiers
                  record_space = record["space"]
                  record_external_id = record["externalId"]
                  props = record["properties"][space_id][container_id]

                  # Store record identifier for deletion
                  records_to_delete.append({
                      "space": record_space,
                      "externalId": record_external_id,
                  })

                  # Prepare for immutable stream
                  immutable_items.append({
                      "space": record_space,
                      "externalId": record_external_id,
                      "sources": [
                          {
                              "source": {
                                  "type": "container",
                                  "space": space_id,
                                  "externalId": container_id,
                              },
                              "properties": props,
                          }
                      ],
                  })

              # Ingest into immutable stream
              immutable_res = client.post(
                  url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records",
                  json={"items": immutable_items},
              )

              # Check if ingest was successful (200/201: sync success, 202: async processing)
              if immutable_res.status_code not in [200, 201, 202]:
                  print(f"Warning: Immutable ingest returned status {immutable_res.status_code}")
                  print(f"Response: {immutable_res.text}")
                  break
              else:
                  print(f"✓ Ingested {len(immutable_items)} records to IMMUTABLE stream")

              # Delete acknowledged records from MUTABLE stream
              print(f"Deleting {len(records_to_delete)} records from MUTABLE stream...")

              delete_res = client.post(
                  url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/delete",
                  json={"items": records_to_delete},
              )
              print(f"✓ Deleted {len(records_to_delete)} records from MUTABLE stream")

              # IMPORTANT: Only update cursor after BOTH operations succeed
              stored_cursor = next_cursor
              total_records_moved += len(acknowledged_records)

              cursor_preview = f"{stored_cursor[:20]}..." if len(stored_cursor) > 20 else stored_cursor
              print(f"✓ Cursor updated: {cursor_preview}")
              print(f"Total records moved so far: {total_records_moved}")

          print(f"\n{'=' * 70}")
          print("PAGINATION COMPLETE")
          print(f"{'=' * 70}")
          print(f"Total batches processed: {batch_number}")
          print(f"Total records moved: {total_records_moved}")
          print(f"{'=' * 70}")
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';
          const immutableStreamId = 'alarm_data_archive_stream';

          // In a production system, you would persist this cursor between runs
          let storedCursor: string | null = null;

          // Pagination settings
          const BATCH_SIZE = 10;
          let totalRecordsMoved = 0;
          let batchNumber = 0;

          console.log(`Starting pagination loop (batch size: ${BATCH_SIZE} records)...`);

          // Keep paginating until there are no more acknowledged records
          while (true) {
            batchNumber++;
            console.log(`\n--- Batch ${batchNumber} ---`);

            const syncResult = await client.records
              .sync(mutableStreamId, {
                sources: [
                  {
                    source: {
                      type: 'container' as const,
                      space: spaceId,
                      externalId: containerId,
                    },
                    properties: ['*'],
                  },
                ],
                // Use stored cursor if available, otherwise initialize from 2 minutes ago
                ...(storedCursor
                  ? { cursor: storedCursor }
                  : { initializeCursor: '2m-ago' }),
                filter: {
                  and: [
                    {
                      hasData: [
                        {
                          type: 'container' as const,
                          space: spaceId,
                          externalId: containerId,
                        },
                      ],
                    },
                    {
                      equals: {
                        property: [spaceId, containerId, 'is_acknowledged'],
                        value: true,
                      },
                    },
                  ],
                },
                limit: BATCH_SIZE,
              });

            const acknowledgedRecords = syncResult.items;
            const nextCursor = syncResult.nextCursor;
            const hasNext = syncResult.hasNext;

            console.log(`Found ${acknowledgedRecords.length} acknowledged records in this batch`);
            console.log(`More data immediately available: ${hasNext}`);

            // If no records found, we're done
            if (acknowledgedRecords.length === 0) {
              console.log('No more acknowledged records to move');
              storedCursor = nextCursor;
              break;
            }

            // Prepare acknowledged records for ingestion into immutable stream
            const immutableItems = [];
            const recordsToDelete = [];

            for (const record of acknowledgedRecords) {
              const recordSpace = record.space;
              const recordExternalId = record.externalId;
              const props = record.properties![spaceId][containerId];

              recordsToDelete.push({
                space: recordSpace,
                externalId: recordExternalId,
              });

              immutableItems.push({
                space: recordSpace,
                externalId: recordExternalId,
                sources: [
                  {
                    source: {
                      type: 'container' as const,
                      space: spaceId,
                      externalId: containerId,
                    },
                    properties: props,
                  },
                ],
              });
            }

            // Ingest into immutable stream
            console.log(`Ingesting ${immutableItems.length} records to IMMUTABLE stream...`);
            await client.records.ingest(immutableStreamId, immutableItems);
            console.log(`✓ Ingested ${immutableItems.length} records to IMMUTABLE stream`);

            // Delete from mutable stream
            console.log(`Deleting ${recordsToDelete.length} records from MUTABLE stream...`);
            await client.records.delete(mutableStreamId, recordsToDelete);
            console.log(`✓ Deleted ${recordsToDelete.length} records from MUTABLE stream`);

            // IMPORTANT: Only update the cursor after BOTH operations succeed
            // This ensures that if either ingest or delete fails, we can retry from the same position
            storedCursor = nextCursor;
            totalRecordsMoved += acknowledgedRecords.length;

            const cursorPreview = storedCursor.length > 20
              ? `${storedCursor.substring(0, 20)}...`
              : storedCursor;
            console.log(`✓ Cursor updated: ${cursorPreview}`);
            console.log(`Total records moved so far: ${totalRecordsMoved}`);
          }

          console.log('\n' + '='.repeat(70));
          console.log('PAGINATION COMPLETE');
          console.log('='.repeat(70));
          console.log(`Total batches processed: ${batchNumber}`);
          console.log(`Total records moved: ${totalRecordsMoved}`);
          console.log(`Batch size used: ${BATCH_SIZE}`);
          console.log('='.repeat(70));
          ```
        </CodeGroup>

        **Expected output:**

        ```
        Starting pagination loop to archive acknowledged alarms...

        --- Batch 1 ---
        Found 10 acknowledged records in this batch
        More data immediately available: True
        Ingesting 10 records to IMMUTABLE stream...
        ✓ Ingested 10 records to IMMUTABLE stream
        Deleting 10 records from MUTABLE stream...
        ✓ Deleted 10 records from MUTABLE stream
        ✓ Cursor updated: 08c0b89cecbeb1dab818...
        Total records moved so far: 10

        --- Batch 2 ---
        Found 10 acknowledged records in this batch
        More data immediately available: True
        Ingesting 10 records to IMMUTABLE stream...
        ✓ Ingested 10 records to IMMUTABLE stream
        Deleting 10 records from MUTABLE stream...
        ✓ Deleted 10 records from MUTABLE stream
        ✓ Cursor updated: 08c0aae0c3c0b1dab818...
        Total records moved so far: 20

        --- Batch 3 ---
        Found 10 acknowledged records in this batch
        More data immediately available: True
        Ingesting 10 records to IMMUTABLE stream...
        ✓ Ingested 10 records to IMMUTABLE stream
        Deleting 10 records from MUTABLE stream...
        ✓ Deleted 10 records from MUTABLE stream
        ✓ Cursor updated: 08c0e5b4cdc4b1dab818...
        Total records moved so far: 30

        [... batches 4-6 similar, each moving 10 records ...]

        --- Batch 7 ---
        Found 4 acknowledged records in this batch
        More data immediately available: False
        Ingesting 4 records to IMMUTABLE stream...
        ✓ Ingested 4 records to IMMUTABLE stream
        Deleting 4 records from MUTABLE stream...
        ✓ Deleted 4 records from MUTABLE stream
        ✓ Cursor updated: 0880beeae1c9b1dab818...
        Total records moved so far: 64

        --- Batch 8 ---
        Found 0 acknowledged records in this batch
        More data immediately available: False
        No more acknowledged records to move

        ======================================================================
        PAGINATION COMPLETE
        ======================================================================
        Total batches processed: 8
        Total records moved: 64
        ======================================================================
        ```
      </Accordion>

      <Accordion title="Example: verify final distribution across streams">
        After archiving, verify that the records were moved correctly by querying both streams to see the final distribution of acknowledged and unacknowledged alarms.

        <CodeGroup>
          ```bash curl theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          # Query mutable stream
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/alarm_data_live_stream/records/aggregate" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "lastUpdatedTime": { "gt": "2025-10-23T00:00:00.000Z" },
              "filter": {
                "hasData": [{
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                }]
              },
              "aggregates": {
                "total_count": { "count": {} },
                "by_acknowledged": {
                  "uniqueValues": {
                    "property": ["scada-alarms", "alarm_events", "is_acknowledged"]
                  }
                }
              }
            }'

          # Query immutable stream
          curl -X POST \
            "https://${CLUSTER}.cognitedata.com/api/v1/projects/${PROJECT}/streams/alarm_data_archive_stream/records/aggregate" \
            -H "Content-Type: application/json" \
            -H "Authorization: Bearer ${TOKEN}" \
            -d '{
              "lastUpdatedTime": { "gt": "2025-10-23T00:00:00.000Z" },
              "filter": {
                "hasData": [{
                  "type": "container",
                  "space": "scada-alarms",
                  "externalId": "alarm_events"
                }]
              },
              "aggregates": {
                "total_count": { "count": {} },
                "by_acknowledged": {
                  "uniqueValues": {
                    "property": ["scada-alarms", "alarm_events", "is_acknowledged"]
                  }
                }
              }
            }'
          ```

          ```python Python theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          from cognite.client import CogniteClient
          from datetime import datetime, timedelta

          client = CogniteClient()

          space_id = "scada-alarms"
          container_id = "alarm_events"
          mutable_stream_id = "alarm_data_live_stream"
          immutable_stream_id = "alarm_data_archive_stream"

          # Query mutable stream for distribution
          mutable_response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
              json={
                  "lastUpdatedTime": {
                      "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
                  },
                  "filter": {
                      "hasData": [
                          {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          }
                      ]
                  },
                  "aggregates": {
                      "total_count": {"count": {}},
                      "by_acknowledged": {
                          "uniqueValues": {
                              "property": [space_id, container_id, "is_acknowledged"]
                          }
                      },
                  },
              },
          )

          mutable_results = mutable_response.json()
          print("MUTABLE Stream Statistics:")
          print(f"  Total Records: {mutable_results['aggregates']['total_count']['count']}")

          mutable_buckets = mutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
          for bucket in mutable_buckets:
              is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
              ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
              print(f"  {ack_status}: {bucket['count']} records")

          # Query immutable stream for distribution
          immutable_response = client.post(
              url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records/aggregate",
              json={
                  "lastUpdatedTime": {
                      "gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
                  },
                  "filter": {
                      "hasData": [
                          {
                              "type": "container",
                              "space": space_id,
                              "externalId": container_id,
                          }
                      ]
                  },
                  "aggregates": {
                      "total_count": {"count": {}},
                      "by_acknowledged": {
                          "uniqueValues": {
                              "property": [space_id, container_id, "is_acknowledged"]
                          }
                      },
                  },
              },
          )

          immutable_results = immutable_response.json()
          print("\nIMMUTABLE Stream Statistics:")
          print(f"  Total Records: {immutable_results['aggregates']['total_count']['count']}")

          immutable_buckets = immutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
          for bucket in immutable_buckets:
              is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
              ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
              print(f"  {ack_status}: {bucket['count']} records")
          ```

          ```javascript JavaScript/TypeScript theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
          import { CogniteClient } from '@cognite/sdk';

          const client = new CogniteClient({ appId: 'records-tutorial' });

          const spaceId = 'scada-alarms';
          const containerId = 'alarm_events';
          const mutableStreamId = 'alarm_data_live_stream';
          const immutableStreamId = 'alarm_data_archive_stream';

          const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();

          // Query mutable stream for distribution
          console.log('MUTABLE Stream Statistics:');
          const mutableAggResults = await client.records.aggregate(mutableStreamId, {
            lastUpdatedTime: {
              gt: oneDayAgo,
            },
            filter: {
              hasData: [
                {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
              ],
            },
            aggregates: {
              total_count: { count: {} },
              by_acknowledged: {
                uniqueValues: {
                  property: [spaceId, containerId, 'is_acknowledged'],
                },
              },
            },
          });

          console.log(`  Total Records: ${mutableAggResults.total_count.count}`);

          const mutableBuckets = mutableAggResults.by_acknowledged.uniqueValueBuckets;
          mutableBuckets.forEach((bucket) => {
            const isAck = bucket.value === 'true' || bucket.value === true;
            const ackStatus = isAck ? 'Acknowledged (True)' : 'Unacknowledged (False)';
            console.log(`  ${ackStatus}: ${bucket.count} records`);
          });

          // Query immutable stream for distribution
          console.log('\nIMMUTABLE Stream Statistics:');
          const immutableAggResults = await client.records.aggregate(immutableStreamId, {
            lastUpdatedTime: {
              gt: oneDayAgo,
            },
            filter: {
              hasData: [
                {
                  type: 'container' as const,
                  space: spaceId,
                  externalId: containerId,
                },
              ],
            },
            aggregates: {
              total_count: { count: {} },
              by_acknowledged: {
                uniqueValues: {
                  property: [spaceId, containerId, 'is_acknowledged'],
                },
              },
            },
          });

          console.log(`  Total Records: ${immutableAggResults.total_count.count}`);

          const immutableBuckets = immutableAggResults.by_acknowledged.uniqueValueBuckets;
          immutableBuckets.forEach((bucket) => {
            const isAck = bucket.value === 'true' || bucket.value === true;
            const ackStatus = isAck ? 'Acknowledged (True)' : 'Unacknowledged (False)';
            console.log(`  ${ackStatus}: ${bucket.count} records`);
          });
          ```
        </CodeGroup>

        **Expected output:**

        ```
        MUTABLE Stream Statistics:
          Total Records: 38
          Unacknowledged (False): 38 records

        IMMUTABLE Stream Statistics:
          Total Records: 64
          Acknowledged (True): 64 records
        ```

        This confirms that all 64 acknowledged alarms have been successfully moved from the mutable stream (which now contains only the remaining 38 unacknowledged alarms) to the immutable archive stream. Together, this accounts for the original 102 total records (38 + 64 = 102).
      </Accordion>
    </AccordionGroup>
  </Step>
</Steps>

<Check>
  You've successfully built a complete SCADA alarm management pipeline using Records! You've created schemas, set up mutable and immutable streams, ingested alarm data, updated records, queried them using multiple endpoints, and implemented a stream-to-stream archiving pattern. This pipeline demonstrates the full lifecycle of managing high-volume OT event data with Records.
</Check>

## Advanced: Build a stream-to-stream transformation pipeline

A powerful architectural pattern for Records is creating data pipelines that move and transform data between streams. This separates raw data ingestion from creating clean, standardized datasets for consumption. You'll build a three-stage pipeline: ingest raw data to a source stream, transform it using a Hosted REST Extractor, and consume the standardized data from a destination stream.

To build a stream-to-stream transformation pipeline, you need to configure these components:

1. **Source**: Connects to the CDF API to read from the source stream's `/sync` endpoint.
2. **Destination**: Provides credentials to write transformed records back to CDF.
3. **Mapping**: Transforms records from the source stream format to the destination stream format.
4. **Job**: Ties everything together and runs on a schedule with pagination and incremental load handling.

<Steps>
  <Step title="Create the source">
    The source defines how the REST extractor connects to the CDF API to read records from your source stream. Since you're reading from CDF, use your CDF cluster as the host and configure OAuth client credentials authentication.

    ```python wrap theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
    from cognite.client.data_classes.hosted_extractors.sources import (
        RestSourceWrite,
        RESTClientCredentialsAuthenticationWrite
    )

    source = client.hosted_extractors.sources.create(
        RestSourceWrite(
            external_id="my-records-source",
            host="<your-cdf-cluster>.cognitedata.com",
            scheme="https",
            authentication=RESTClientCredentialsAuthenticationWrite(
                client_id="<your-client-id>",
                client_secret="<your-client-secret>",
                token_url="<your-token-url>",
                scopes="<your-scopes>"
            )
        )
    )
    ```

    Replace `<your-cdf-cluster>`, `<your-client-id>`, `<your-client-secret>`, `<your-token-url>`, and `<your-scopes>` with your actual values.

    <Info>
      The source credentials must have `streamrecords:READ` capability scoped to the source stream's space.
    </Info>
  </Step>

  <Step title="Create the destination">
    The destination provides credentials for the extractor to write transformed records to CDF. Create a session and use its nonce to authenticate the destination.

    ```python wrap theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
    from cognite.client.data_classes.hosted_extractors import DestinationWrite, SessionWrite
    from cognite.client.data_classes.iam import ClientCredentials

    # Create a session for the destination
    session = client.iam.sessions.create(
        client_credentials=ClientCredentials(
            client_id="<your-client-id>",
            client_secret="<your-client-secret>",
        ),
        session_type="CLIENT_CREDENTIALS"
    )

    # Create the destination using the session nonce
    destination = client.hosted_extractors.destinations.create(
        DestinationWrite(
            external_id="my-records-destination",
            credentials=SessionWrite(session.nonce)
        )
    )
    ```

    <Info>
      The destination credentials must have `streamrecords:WRITE` capability scoped to the destination stream's space.
    </Info>
  </Step>

  <Step title="Create the mapping">
    The mapping defines how to transform each record from the source stream into the output format for the destination stream. The `/sync` endpoint returns a response containing an `items` array. The mapping iterates over these items and transforms each one.

    For writing to Records streams, the output must include:

    * `type`: Set to `"immutable_record"` for immutable streams.
    * `space`: The space where the destination stream exists.
    * `stream`: The external ID of the destination stream.
    * `externalId`: A unique identifier for each record.
    * `sources`: An array of container sources with their properties.
    * `beta`: Set to `true` while the records destination behavior is still in preview (the JSON field name remains `beta`). See [release states and availability](/cdf/product_feature_status).

    ```python wrap theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
    from cognite.client.data_classes.hosted_extractors.mappings import MappingWrite, CustomMapping

    mapping_expression = """
    input.items.map(item => {
        "type": "immutable_record",
        "space": "my-space",
        "externalId": item.externalId,
        "stream": "my-destination-stream",
        "beta": true,
        "sources": [
            {
                "source": {
                    "space": "my-space",
                    "externalId": "my-container",
                    "type": "container"
                },
                "properties": item.properties["my-space"]["my-container"]
            }
        ]
    })
    """

    mapping = client.hosted_extractors.mappings.create(
        MappingWrite(
            external_id="my-records-mapping",
            mapping=CustomMapping(mapping_expression),
            published=True,
            input="json"
        )
    )
    ```

    See [custom mappings](/cdf/integration/guides/extraction/hosted_extractors/hosted_extractors_custom_mappings) for the full mapping language reference.
  </Step>

  <Step title="Create the job">
    The job ties everything together. It reads from the source stream's `/sync` endpoint on a schedule, applies the mapping, and writes to the destination. Configure **incremental load** to initialize the cursor on the first run, and **pagination** to handle cursor-based iteration through records.

    ```python wrap theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
    from cognite.client.data_classes.hosted_extractors.jobs import (
        JobWrite,
        CustomFormat,
        RestConfig
    )
    from cognite.client.data_classes.hosted_extractors import BodyLoad

    source_stream_id = "my-source-stream"
    project = client.config.project

    job = client.hosted_extractors.jobs.create(
        JobWrite(
            external_id="my-records-sync-job",
            destination_id="my-records-destination",
            source_id="my-records-source",
            format=CustomFormat(mapping_id="my-records-mapping"),
            config=RestConfig(
                interval="1h",
                path=f"/api/v1/projects/{project}/streams/{source_stream_id}/records/sync",
                method="post",
                headers={"Content-Type": "application/json"},
                body={
                    "limit": 1000,
                    "filter": {
                        "hasData": [
                            {
                                "type": "container",
                                "space": "my-space",
                                "externalId": "my-container"
                            }
                        ]
                    }
                },
                # Initialize cursor on first run
                incremental_load=BodyLoad(
                    value='{...context.request.body, ...{"initializeCursor": "30d-ago"}}'
                ),
                # Use cursor from response for subsequent requests
                pagination=BodyLoad(
                    value='if(body.nextCursor, {...context.request.body, ...{"cursor": body.nextCursor}}, null)'
                ),
            ),
        )
    )
    ```

    The job configuration includes:

    | Parameter             | Description                                                                                                             |
    | --------------------- | ----------------------------------------------------------------------------------------------------------------------- |
    | **interval**          | How often the job runs (for example, `1h`, `15m`, `1d`).                                                                |
    | **path**              | The `/sync` endpoint path for the source stream.                                                                        |
    | **body**              | The base request body with limit and filter parameters.                                                                 |
    | **incremental\_load** | Sets `initializeCursor` on the first request to start syncing from a specific point (for example, `30d-ago`).           |
    | **pagination**        | Uses `nextCursor` from the response to paginate through all records. Returns `null` to stop when no more records exist. |

    <Info>
      The `incremental_load` expression merges the base body with the `initializeCursor` parameter. The `pagination` expression checks if `nextCursor` exists in the response and either continues with the cursor or returns `null` to stop pagination.
    </Info>
  </Step>

  <Step title="Monitor the job">
    After creating the job, you can monitor its status, logs, and metrics.

    ```python wrap theme={"languages":{"custom":["/_languages/kuiper.json","../_languages/kuiper.json"]}}
    # Check job logs
    logs = client.hosted_extractors.jobs.list_logs(job="my-records-sync-job")

    # Check job metrics
    metrics = client.hosted_extractors.jobs.list_metrics(job="my-records-sync-job")
    ```

    **Expected outcome**: The job runs on the configured interval, reads records from the source stream, transforms them using your mapping, and writes them to the destination stream.
  </Step>
</Steps>

This composable architecture lets you ingest data once in its raw format, then create streams containing standardized, use-case-specific record schemas from the source stream. This builds a robust and maintainable data processing ecosystem within CDF for your high-volume OT event and log data.
