Salt la conținutul principal

Set up the EDM extractor

You can run the Cognite EDM extractor as a Java .jar file (platform independent) or as a Docker container image.

Before you start

  1. Check the server requirements for the extractor.

  2. Make sure the extractor has the following access capabilities in a Cognite Data Fusion (CDF) project:

    • raw:read, raw:write, and raw:list
  3. Create a config directory and add a configuration file according to the configuration settings. The file must be in YAML format.

  4. If you're running the extractor as a Docker image, you need access to Cognite's internal Docker registry. Contact Cognite Support to get access.

Run as a .jar file

  1. Navigate to Data management > Integrate > Extractors to download the .jar file.

  2. Run the .jar file:

    java -jar path/to/cognite-edm-extractor.jar path/to/config.yml

Run as a Docker container

  1. Download the docker image from Cognite's internal Docker registry:

    gcloud auth configure-docker
    gcloud auth login --no-launch-browser
  2. Enter this docker run statement:

    docker run -v path_to_your_config_file/config.yml:/config/config.yml \
    -it eu.gcr.io/cognite-registry/edm-connector:latest

    Where:

    • docker run starts a new container.
    • -v /path/to/config.yml:/config/config.yml shares the file at /path/to/config.yml with the container, putting it at /config/config.yml, which is the default configuration file location for the docker image.
    • eu.gcr.io/cognite-registry/edm-connector:<version> is the name of the image. Replace <version> with the version you'll run. Cognite strongly recommends running the latest version.

Load data incrementally

You can set up the extractor for full-load extractions or incremental extractions. You load data incrementally by adding the delta-supported EDM entities to the entities configuration parameter and set a schedule for checking incremental EDM entities with the gracePeriod configuration parameter. The extractor creates a CDF RAW table for each EDM entity.

A full extraction is triggered according to the configured schedule in the crontabComplete configuration parameter for listed entities that don't support incremental extraction. EDM doesn't expose a create or update time for each row.

The extractor sends the EDM records to CDF RAW, creating a table for each EDM entity defined in the extractor configuration. In addition, it created two tables to hold the state of the extractor:

  • CdfEdmObjectInfos: Stores a list of the unique objects across all entities that have been extracted from EDM. When an object changes, a new revision is added to the entity-specific table, and the location link column for that entry is updated to point to the new version. This table also holds the state of whether the object has been deleted in the source EDM installation. This is periodically checked if you've configured the consistencySchedule configuration parameter.
  • CdfEdmExtractionStates: Stores the information about the extraction and when it last successfully ran.

There are two methods to process the extracted records:

  • Iterate through the rows in CdfEdmObjectInfos and filter on the entity types relevant for the client (the entity type is in the type field). Then for each of the rows, retrieve the serialized object by looking up the key found in locationLink in the matching entity RAW table. This ensures that the last version is retrieved. Note that the state field specifies whether the entity instance is ACTIVE or DELETED.
  • Iterate through the rows of the entity-specific table and process them directly. This returns all versions of the entity instances the extractor has seen. The client should verify whether an object is deleted by looking up the matching entry in CdfEdmObjectInfos. To find the key to lookup in CdfEdmObjectInfos, remove the trailing version from the key in the entity-specific version. This is an underscore followed by a positive number.

Once a full iteration has been done, the client should save the maximum value for lastUpdatedTime encountered for rows, and then on the next iteration query for new rows since this time by setting the minLastUpdatedTime query parameter to the value minus 1 (since minLastUpdatedTime is exclusive). See the RAW API for further details.

For entities that support incremental processing, the value persisted for the entity in CdfEdmExtractionStates is used to query for new instances on each iteration. To allow for clock skew between the EDM installation and the client, a buffer period is subtracted from the timestamp before it's used to query for objects that have a newer update or create value in EDM. Deleting the row entry for a specific EDM entity from the CdfEdmExtractionStates, and restarting the extractor, will make the extractor re-process an incremental EDM entity from scratch.

Using the extracted data

The records are persisted as protobuf, in theedmObject column for each row in the entity-specific table. Protobuf retains both the typing of the properties and the precision of the values.

This is an example of what a CD_WELLBORE extracted record looks like in CDF RAW:

{
"edmObject": "1nrwSJrvVfVyqSGRkPG1WSK0yRVwbvpmSMFH5inySCnFW9btRSQDNNPZYPNDHANNNVjfbONjbOoqbONjbOorbOPDbUD09AHRSGH4VPPDbUD09AHRSGH5VPODbQMTIaftVQPtSghtVSQDNNPZYXNtZXNJ3nNtZXNIavNtZXNJ2nNk8XUKOyqT1wMFuxLKcfoJguLaOjpzDjASkjMKEgL2HcdtZXMRSjIT1nrwSJriVQPNbToJI0MKWmttDSPtAxMJrnONLVd8sUzNndONZXNJ2lONZXNJ3XONZXNJ3FONZXNJ3vONcmZIyWGz9dHH9c8tDQPtSgvtHQPtSgltHQPtSg0tHQPtSg2tHTPZsh85pT4tHQPtSgvtLQPtSgjtLQPtSg0tLSPtAxMJsvOtHANNNVjhbTUjbqp2yxn3IhXTEurzkgn2SvpUOlMQN0KUAcMTg1ovalOtZXNHi6OtZXNH6vOjZXNJ2dOmLXARIRAGNtXvOSHSAUYH5ipvOGAwVtZwNjZFNiVSIHGFO"
}

To explore the extracted records, you can use the cognite-edm-data Python library to decode the protobuf objects into Python objects. For other languages (such as Java and C#), the extractor distribution includes the protobuf specification used to persist the objects in the file distribution-protobuf.proto. You can use this specification as input for the protoc tool to generate code for decoding the persisted objects.

Below you will find an example Python code snippet for how to decode EDM protobuf objects. This example converts the serialized object into a Python dictionary object.

import cognite.edm.connector.native.edm_native_v1_pb2 as edm_protobuf
from base64 import b64decode
from google.protobuf.json_format import MessageToDict


def decode_object(data):
decoded = b64decode(data)
edm = edm_protobuf.EDMObjects()
message = edm.FromString(decoded)
return MessageToDict(message.objects[0], preserving_proto_field_name=True)

# function expects CDF client, raw database name and edm raw table name as parameters

def read_objects(client, db_name, table_name):
raw = [
row.columns
for row in client.raw.rows(
db_name=db_name,
table_name=table_name,
limit=None,
)
]

return [
decode_object(row['edmObject'])
for row in raw
]