Salt la conținutul principal

Cognite Kafka extractor

The Cognite Kafka extractor is a generic extractor that extracts data from Apache Kafka. It connects to a Kafka broker and subscribes to a single topic.

The Kafka extractor is hosted by Cognite, so you don't have to download and install anything. To deploy the extractor:

  1. You need hostedextractors:READ and hostedextractors:WRITE capabilities to create a Kafka extractor.

  2. Make sure the extractor has the following access capabilities in a Cognite Data Fusion (CDF) project: timeseries:READ, timeseries:WRITE for writing datapoints and time series, events:READ, events:WRITE, assets:READ for writing events, raw:READ, raw:WRITE for writing raw rows, or datamodelinstances:write for data models.

    sfat

    You can use OpenID Connect and your existing identity provider (IdP) framework to manage access to CDF data securely. Read more.

  3. Navigate to Data management > Integrate > Extractors.

  4. Locate the Cognite Kafka extractor and select Set up extractor.

About Kafka

Kafka is an event streaming platform from Apache. It supports payloads on any format. The Cognite Kafka extractor supports a number of pre-defined message formats, and you can define your own.

Messages are published on topics. A Kafka broker has a fixed set of topics, each writing data to one or more partitions. When you create a Kafka extractor instance, you must specify the number of partitions, and the name of the topic.

Kafka is highly redundant, so when you connect to a kafka broker, you typically connect to one or more from a list of bootstrap brokers, which then tell the client which specific broker to connect to. This list of bootstrap brokers is what you give the extractor.

Message formats

See Custom data formats for hosted extractors for how to write your own custom mapping of Kafka messages. Custom formats used with Kafka jobs will receive an input argument containing the message as JSON, and a context argument containing the topic and some other message metadata, for example:

{
"key": [1, 2, 3, 4],
"topic": "stream_a",
"headers": {
"headerA": "valueA",
"headerB": "valueB"
},
"timestamp": 1710312447698,
"current_offset": 5912,
"partition_id": 1
}

This information can be used to help construct data points or other supported output types.