Set up the EDM extractor
You can run the Cognite EDM extractor as a Java .jar file (platform independent) or as a Docker container image.
Before you start
-
Check the server requirements for the extractor.
-
Make sure the extractor has the following access capabilities in a Cognite Data Fusion (CDF) project:
raw:read
,raw:write
, andraw:list
-
Create a
config
directory and add a configuration file according to the configuration settings. The file must be in YAML format. -
If you're running the extractor as a Docker image, you need access to Cognite's internal Docker registry. Contact Cognite Support to get access.
Run as a .jar file
-
Navigate to Data management > Integrate > Extractors to download the .jar file.
-
Run the .jar file:
java -jar path/to/cognite-edm-extractor.jar path/to/config.yml
Run as a Docker container
-
Download the docker image from Cognite's internal Docker registry:
gcloud auth configure-docker
gcloud auth login --no-launch-browser -
Enter this
docker run
statement:docker run -v path_to_your_config_file/config.yml:/config/config.yml \
-it eu.gcr.io/cognite-registry/edm-connector:latestWhere:
docker run
starts a new container.-v /path/to/config.yml:/config/config.yml
shares the file at/path/to/config.yml
with the container, putting it at/config/config.yml
, which is the default configuration file location for the docker image.eu.gcr.io/cognite-registry/edm-connector:<version>
is the name of the image. Replace<version>
with the version you'll run. Cognite strongly recommends running the latest version.
Load data incrementally
You can set up the extractor for full-load extractions or incremental extractions. You load data incrementally by adding the delta-supported EDM entities to the entities
configuration parameter and set a schedule for checking incremental EDM entities with the gracePeriod
configuration parameter. The extractor creates a CDF RAW table for each EDM entity.
A full extraction is triggered according to the configured schedule in the crontabComplete
configuration parameter for listed entities that don't support incremental extraction. EDM doesn't expose a create or update time for each row.
Navigating the explored data
The extractor sends the EDM records to CDF RAW, creating a table for each EDM entity defined in the extractor configuration. In addition, it created two tables to hold the state of the extractor:
CdfEdmObjectInfos
: Stores a list of the unique objects across all entities that have been extracted from EDM. When an object changes, a new revision is added to the entity-specific table, and the location link column for that entry is updated to point to the new version. This table also holds the state of whether the object has been deleted in the source EDM installation. This is periodically checked if you've configured theconsistencySchedule
configuration parameter.CdfEdmExtractionStates
: Stores the information about the extraction and when it last successfully ran.
There are two methods to process the extracted records:
- Iterate through the rows in
CdfEdmObjectInfos
and filter on the entity types relevant for the client (the entity type is in thetype
field). Then for each of the rows, retrieve the serialized object by looking up the key found inlocationLink
in the matching entity RAW table. This ensures that the last version is retrieved. Note that thestate
field specifies whether the entity instance isACTIVE
orDELETED
. - Iterate through the rows of the entity-specific table and process them directly. This returns all versions of the entity instances the extractor has seen. The client should verify whether an object is deleted by looking up the matching entry in
CdfEdmObjectInfos
. To find the key to lookup inCdfEdmObjectInfos
, remove the trailing version from the key in the entity-specific version. This is an underscore followed by a positive number.
Once a full iteration has been done, the client should save the maximum value for lastUpdatedTime
encountered for rows, and then on the next iteration query for new rows since this time by setting the minLastUpdatedTime
query parameter to the value minus 1 (since minLastUpdatedTime
is exclusive). See the RAW API for further details.
For entities that support incremental processing, the value persisted for the entity in CdfEdmExtractionStates
is used to query for new instances on each iteration. To allow for clock skew between the EDM installation and the client, a buffer period is subtracted from the timestamp before it's used to query for objects that have a newer update or create value in EDM. Deleting the row entry for a specific EDM entity from the CdfEdmExtractionStates
, and restarting the extractor, will make the extractor re-process an incremental EDM entity from scratch.
Using the extracted data
The records are persisted as protobuf, in theedmObject
column for each row in the entity-specific table. Protobuf retains both the typing of the properties and the precision of the values.
This is an example of what a CD_WELLBORE
extracted record looks like in CDF RAW:
{
"edmObject": "1nrwSJrvVfVyqSGRkPG1WSK0yRVwbvpmSMFH5inySCnFW9btRSQDNNPZYPNDHANNNVjfbONjbOoqbONjbOorbOPDbUD09AHRSGH4VPPDbUD09AHRSGH5VPODbQMTIaftVQPtSghtVSQDNNPZYXNtZXNJ3nNtZXNIavNtZXNJ2nNk8XUKOyqT1wMFuxLKcfoJguLaOjpzDjASkjMKEgL2HcdtZXMRSjIT1nrwSJriVQPNbToJI0MKWmttDSPtAxMJrnONLVd8sUzNndONZXNJ2lONZXNJ3XONZXNJ3FONZXNJ3vONcmZIyWGz9dHH9c8tDQPtSgvtHQPtSgltHQPtSg0tHQPtSg2tHTPZsh85pT4tHQPtSgvtLQPtSgjtLQPtSg0tLSPtAxMJsvOtHANNNVjhbTUjbqp2yxn3IhXTEurzkgn2SvpUOlMQN0KUAcMTg1ovalOtZXNHi6OtZXNH6vOjZXNJ2dOmLXARIRAGNtXvOSHSAUYH5ipvOGAwVtZwNjZFNiVSIHGFO"
}
To explore the extracted records, you can use the cognite-edm-data Python library to decode the protobuf objects into Python objects. For other languages (such as Java and C#), the extractor distribution includes the protobuf specification used to persist the objects in the file distribution-protobuf.proto
. You can use this specification as input for the protoc tool to generate code for decoding the persisted objects.
Below you will find an example Python code snippet for how to decode EDM protobuf objects. This example converts the serialized object into a Python dictionary object.
import cognite.edm.connector.native.edm_native_v1_pb2 as edm_protobuf
from base64 import b64decode
from google.protobuf.json_format import MessageToDict
def decode_object(data):
decoded = b64decode(data)
edm = edm_protobuf.EDMObjects()
message = edm.FromString(decoded)
return MessageToDict(message.objects[0], preserving_proto_field_name=True)
# function expects CDF client, raw database name and edm raw table name as parameters
def read_objects(client, db_name, table_name):
raw = [
row.columns
for row in client.raw.rows(
db_name=db_name,
table_name=table_name,
limit=None,
)
]
return [
decode_object(row['edmObject'])
for row in raw
]