Skip to main content

Power BI incremental refresh

Configure incremental refresh with the Cognite Data Fusion REST API connector for Power BI to load large volumes of data. This feature lets you refresh only the most recent data, reducing refresh times and improving reliability.

Beta

The features described in this section are currently in beta and are subject to change.

About incremental refresh

Incremental refresh in Power BI partitions your data based on date ranges and refreshes only the most recent data. With this approach, only recent data is refreshed, not the entire data set. Shorter connections to data sources reduce the risk of timeouts, and you can work with data sets that would be impractical to refresh entirely.

Before you start

  • Make sure the endpoint you want to fetch data from supports timestamp-based filtering.

  • The schema must be consistent. Your queries must return a consistent schema even when filtering results in no data.

Set up incremental refresh

  1. Create two Power Query parameters named RangeStart and RangeEnd of type datetime.

  2. Use these parameters to filter data. The parameters can be set to any date range.

  3. Configure incremental refresh policy in Power BI Desktop.

  4. Publish the model to Power BI Service.

tip

Set RangeStart and RangeEnd to a small time period while developing to limit the amount of data loaded into Power BI Desktop. Once the model is published, the Power BI Service will automatically adjust these parameters to load the archive partition and the most recent data based on the incremental refresh policy you defined.

For more information about incremental refresh concepts, see Microsoft's Incremental refresh and real-time data for semantic models.

Utility functions

Since CDF endpoints expect timezone-aware timestamps in specific formats, you must convert the RangeStart and RangeEnd values to the appropriate format for each request type:

  • GetCDF/PostCDF: Normal GET/POST requests to the Cognite APIs expect timestamps in milliseconds since epoch (UTC).

  • GraphQL: Data modeling GraphQL requests expect timestamps in ISO 8601 string format.

  • ODataCDF: OData requests expect timestamps in datetimezone format for column filtering.

ConvertDateTimeToMs

Convert DateTime to milliseconds since epoch
(dt as nullable datetime) as nullable number =>
if dt = null then null else
let
// Convert the input datetime to a DateTimeZone
LocalDateTimeZone = DateTimeZone.From(dt),
// Convert the local time to UTC
UTCDateTimeZone = DateTimeZone.SwitchZone(LocalDateTimeZone, 0),
// Remove the zone information to work with a pure datetime value
UTCDateTime = DateTimeZone.RemoveZone(UTCDateTimeZone),
// Define the Unix epoch start in UTC
UnixEpochStart = #datetime(1970, 1, 1, 0, 0, 0),
// Calculate the duration between the UTC datetime and Unix epoch start
Delta = UTCDateTime - UnixEpochStart,
// Convert duration to total milliseconds and floor the result
TotalMilliseconds = Number.RoundDown(Duration.TotalSeconds(Delta) * 1000)
in
TotalMilliseconds

ConvertDateTimeToIso

Convert DateTime to ISO 8601 text representation
(dt as nullable datetime) as nullable text =>
if dt = null then null else
let
// Convert the input datetime to a DateTimeZone
LocalDateTimeZone = DateTimeZone.From(dt),
// Convert the local time to UTC
UTCDateTimeZone = DateTimeZone.SwitchZone(LocalDateTimeZone, 0),
// Use DateTimeZone.ToText with ISO 8601 format
Result = DateTimeZone.ToText(UTCDateTimeZone, [Format="yyyy-MM-ddTHH:mm:sszzz", Culture="en-US"])
in
Result

ConvertDateTimeToDateTimeZone

Convert DateTime to DateTimeZone
(dt as nullable datetime) as nullable datetimezone =>
if dt = null then null else
let
// Convert the input datetime to a DateTimeZone
LocalDateTimeZone = DateTimeZone.From(dt),
// Convert the local time to UTC
UTCDateTimeZone = DateTimeZone.SwitchZone(LocalDateTimeZone, 0)
in
UTCDateTimeZone

EnforceSchema

The EnforceSchema function ensures that the output table has a consistent schema, even when the API response is empty. This function takes an input table and a list of expected column names, and returns a table with the specified columns. If any of the expected columns are missing in the input table, they're added with null values.

Enforce schema for a table
(InputTable as table, Columns as list) as table =>
let
// If the input table is empty, create an empty table with the required columns.
BaseTable = if Table.IsEmpty(InputTable) then
#table(Columns, {})
else
InputTable,
// For each required column, if it's missing in the response, add it with null values.
TableWithAllColumns = List.Accumulate(
Columns,
BaseTable,
(state, currentColumn) =>
if List.Contains(Table.ColumnNames(state), currentColumn) then
state
else
Table.AddColumn(state, currentColumn, each null)
),
// Reorder (or select) the columns to match the order provided in the Columns list.
FinalTable = Table.SelectColumns(TableWithAllColumns, Columns)
in
FinalTable

ColumnNamesToText

The ColumnNamesToText function generates a list of column names in a format that can be used in the EnforceSchema function. You can use it once with the response from the GetCDF, PostCDF, or GraphQL functions (when the current filter returns at least one entry) to create a list of column names that are returned by the API. This list can be used in the EnforceSchema function to ensure that the schema is consistent even when no data is returned.

Convert column names to text
(InputTable as table) as text =>
let
// Get the column names
columnNames = Table.ColumnNames(InputTable),
// Wrap each column name in quotes
quotedColumnNames = List.Transform(columnNames, each """" & _ & """"),
// Combine them into a single text with comma separation
combinedText = Text.Combine(quotedColumnNames, ", "),
// Add curly braces around the combined text
ColumnNamesAsText = "{ " & combinedText & " }"
in
ColumnNamesAsText

Incremental refresh examples

GetCDF example

This example shows how to configure an incremental refresh when fetching CDF events using the GetCDF function with a URL parameter filter:

Incremental refresh with GetCDF
let
// Convert parameters to milliseconds since epoch
RangeStartMs = ConvertDateTimeToMs(RangeStart),
RangeEndMs = ConvertDateTimeToMs(RangeEnd),
// Create the request URL with timestamp filters
QueryURL = Text.Format(
"/events?includeMetadata=true&minCreatedTime=#{0}&maxCreatedTime=#{1}",
{ Number.ToText(RangeStartMs), Number.ToText(RangeEndMs) }
),
// Fetch data using GetCDF
EventsTable = GetCDF(QueryURL),
// Define the expected schema
Columns = {
"externalId",
"dataSetId",
"startTime",
"type",
"subtype",
"description",
"metadata",
"assetIds",
"source",
"id",
"lastUpdatedTime",
"createdTime"
},
// Enforce the schema to ensure a consistent table even if no data is returned
FinalTable = EnforceSchema(EventsTable, Columns)
in
FinalTable

PostCDF example

This example shows shows an incremental refresh when fetching data modeling nodes with the PostCDF function:

Incremental refresh with PostCDF
let
// Convert parameters to milliseconds since epoch
RangeStartMs = ConvertDateTimeToMs(RangeStart),
RangeEndMs = ConvertDateTimeToMs(RangeEnd),
// Create the request body with timestamp filters
Body = [
sources = {
[
source = [
type = "view",
space = "cdf_cdm",
externalId = "CogniteActivity",
version = "v1"
]
]
},
filter = [
range = [
property = { "node", "createdTime" },
gt = RangeStartMs,
lt = RangeEndMs
]
],
limit = 1000
],
// Convert the body to JSON text
BodyText = Text.FromBinary(Json.FromValue(Body)),
// Fetch data using PostCDF
NodesTable = PostCDF("/models/instances/list", BodyText),
// Remove null rows
CleanedTable = Table.SelectRows(
NodesTable,
each List.NonNullCount(Record.ToList(_)) > 0
),
// Define the expected schema
Columns = {
"instanceType",
"version",
"space",
"externalId",
"createdTime",
"lastUpdatedTime",
"properties"
},
// Enforce the schema to ensure a consistent table even if no data is returned
FinalTable = EnforceSchema(CleanedTable, Columns)
in
FinalTable

GraphQL example

This example shows how to implement an incremental refresh with a GraphQL query for data model instances:

Incremental refresh with GraphQL
let
// Convert parameters to ISO 8601 format
StartIso = ConvertDateTimeToIso(RangeStart),
EndIso = ConvertDateTimeToIso(RangeEnd),
// Create the GraphQL query with timestamp filters
QueryText = "query MyQuery(
$cursor: String,
$rangeStart: Timestamp,
$rangeEnd: Timestamp
) {
listCogniteTimeSeries(
first: 1000,
filter: {
and: [
{ createdTime: { gt: $rangeStart } },
{ createdTime: { lt: $rangeEnd } }
]
},
after: $cursor
) {
items {
type
space
sourceUnit
name
lastUpdatedTime
isStep
externalId
description
createdTime
}
}
}",
// Define the variables for the GraphQL query
QueryVariables = [
rangeStart = StartIso,
rangeEnd = EndIso
],
// Convert the variables to JSON text
QueryVariablesText = Text.FromBinary(Json.FromValue(QueryVariables)),
// Fetch data using GraphQL
NodesTable = GraphQL("cdf_cdm", "CogniteCore", "v1", QueryText, QueryVariablesText),
// Define the expected schema
Columns = {
"type",
"space",
"sourceUnit",
"name",
"lastUpdatedTime",
"isStep",
"externalId",
"description",
"createdTime"
},
// Enforce the schema to ensure a consistent table even if no data is returned
FinalTable = EnforceSchema(NodesTable, Columns)
in
FinalTable

ODataCDF example

This example shows incremental refresh with ODataCDF using column-level filtering, which Power BI can push down to the server. Note that when using OData you don't need to enforce any schema since the OData service always returns a table with the same schema, even if no data is returned.

Incremental refresh with ODataCDF (column filtering)
let
// Convert parameters to datetimezone format
RangeStartUTC = ConvertDateTimeToDateTimeZone(RangeStart),
RangeEndUTC = ConvertDateTimeToDateTimeZone(RangeEnd),
// Access the OData service
Source = ODataCDF("publicdata"),
// Access the Events table
EventsTable = Source{[Name="Events",Signature="table"]}[Data],
// Filter the Events table using RangeStart and RangeEnd parameters
// This filter will be pushed down to the OData service
EventsWithRangeFilter = Table.SelectRows(
EventsTable,
each [CreatedTime] > RangeStartUTC and [CreatedTime] < RangeEndUTC
)
in
EventsWithRangeFilter

Configure an incremental refresh

When you've implemented the filtering using RangeStart and RangeEnd, follow these steps to configure an incremental refresh:

  1. In Power BI Desktop, define RangeStart and RangeEnd with values that limit the data loaded to a small period for development and testing.

  2. Apply the filtering logic using one of the supported functions shown in the examples.

  3. Load the data by selecting Close & Apply in the Power Query editor.

  4. Go to the main Power BI Desktop window, right-click the table you want to configure for incremental refresh in the Fields pane. Select Incremental refresh to open the incremental refresh policy configuration window.

  5. Publish your model to the Power BI Service when you've configured the policy.

Refresh in Power BI Service

When you've published your model with an incremental refresh policy, follow these steps to refresh data:

  1. Perform a manual refresh to load all historical data according to your policy. This initial refresh will take longer as it needs to build all partitions and load the historical data.

  2. Configure a scheduled refresh to update only the incremental data according to your policy. These subsequent refreshes will be much faster as they only process the most recent data.