Cognite Kafka extractor
The Cognite Kafka extractor is a generic extractor that extracts data from Apache Kafka. It connects to a Kafka broker and subscribes to a single topic.
The Kafka extractor is hosted by Cognite, so you don't have to download and install anything. To deploy the extractor:
-
You need
hostedextractors:READ
andhostedextractors:WRITE
capabilities to create a Kafka extractor. -
Make sure the extractor has the following access capabilities in a Cognite Data Fusion (CDF) project:
timeseries:READ
,timeseries:WRITE
for writing datapoints and time series,events:READ
,events:WRITE
,assets:READ
for writing events,raw:READ
,raw:WRITE
for writing raw rows, ordatamodelinstances:write
for data models.sfatYou can use OpenID Connect and your existing identity provider (IdP) framework to manage access to CDF data securely. Read more.
-
Navigate to Data management > Integrate > Extractors.
-
Locate the Cognite Kafka extractor and select Set up extractor.
About Kafka
Kafka is an event streaming platform from Apache. It supports payloads on any format. The Cognite Kafka extractor supports a number of pre-defined message formats, and you can define your own.
Messages are published on topics. A Kafka broker has a fixed set of topics, each writing data to one or more partitions. When you create a Kafka extractor instance, you must specify the number of partitions, and the name of the topic.
Kafka is highly redundant, so when you connect to a kafka broker, you typically connect to one or more from a list of bootstrap brokers, which then tell the client which specific broker to connect to. This list of bootstrap brokers is what you give the extractor.
Message formats
See Custom data formats for hosted extractors for how to write your own custom mapping of Kafka messages.
Custom formats used with Kafka jobs will receive an input
argument containing the message as JSON, and a context
argument containing the topic and some other message metadata, for example:
{
"key": [1, 2, 3, 4],
"topic": "stream_a",
"headers": {
"headerA": "valueA",
"headerB": "valueB"
},
"timestamp": 1710312447698,
"current_offset": 5912,
"partition_id": 1
}
This information can be used to help construct data points or other supported output types.