Power Query 函数和示例查询
将 Cognite Data Fusion (REST API) connector for Power BI 公开的函数与 Power Query 相结合,通过 Cognite API 获取和转换数据,从而使用 Microsoft Power BI 和 Microsoft Excel 创建报告仪表板。
本部分中所述的功能目前在 Beta 测试中面向所选客户提供,可能会更改。
复制并调整此页面上的查询和 Power Query 函数,以满足你的业务需求。
效用函数
从纪元转换时间戳/将时间戳转换为纪元
CDF 资源类型预期并返回使用_自 Unix 纪元以来的毫秒数_的时间戳。Power Query 没有方法可以自动将这种格式解析为 datetimezone
类型来表示时区感知型时间戳。CDF 数据模型使用 ISO 8601 格式表示原始字段的时间戳。
使用以下函数在 datetimezone
变量和_自 Unix 纪元以来的毫秒数_之间进行转换,并将 datetimezone
变量转换为 ISO 8601 格式的文本。
ConvertDateTimeZoneToMs
//
(dtz as datetimezone) as number =>
let
// Convert the input DateTimeZone to UTC
UtcDateTime = DateTimeZone.RemoveZone(DateTimeZone.SwitchZone(dtz, 0)),
// Define the Unix epoch start
UnixEpochStart = #datetime(1970, 1, 1, 0, 0, 0),
// Calculate the duration between the input date and Unix epoch start
Delta = UtcDateTime - UnixEpochStart,
// Convert duration to total milliseconds
TotalMilliseconds = Duration.TotalSeconds(Delta) * 1000
in
TotalMilliseconds
ConvertMsToDateTimeZone
(ms as number) as datetimezone =>
let
// Convert ms to seconds
SecondsSinceEpoch = ms / 1000,
// Create a duration
DurationSinceEpoch = #duration(0, 0, 0, SecondsSinceEpoch),
// Add duration to Unix epoch start to get UTC datetime
UnixEpochStart = #datetime(1970, 1, 1, 0, 0, 0),
UtcDateTime = UnixEpochStart + DurationSinceEpoch,
// Convert UTC datetime to local time zone
LocalDateTimeZone = DateTimeZone.From(UtcDateTime)
in
LocalDateTimeZone
ConvertDateTimeZoneToIso
(dtz as datetimezone) as text =>
let
// Use DateTimeZone.ToText with ISO 8601 format
Result = DateTimeZone.ToText(dtz, [Format="yyyy-MM-ddTHH:mm:sszzz", Culture="en-US"])
in
Result
添加函数
要在 Power Query 中添加新函数,请选择 Get Data > Blank Query,然后编写函数或复制下面的函数之一。
时间增量
常见的做法是,根据时间增量来定义开始和结束时间戳。刷新数据集时,这些值将会更新。在下面的示例中,EndTime
表示当前时间,而 StartTime
则表示 EndTime
之前 7 天。可以调整此示例以使用不同的时间增量。
CurrentTime = DateTimeZone.LocalNow(),
EndTime = CurrentTime,
StartTime = CurrentTime - #duration(7, 0, 0, 0)
常见的 GET 请求
使用 CDF REST API connector for Power BI 中的 GetCDF
函数从 CDF 获取数据时,必须使用查询参数传递筛选器,以选择要获取的数据。
下面的示例展示了如何将 /timeseries
端点的 externalIdPrefix
和 limit
查询参数添加到 URL 中,以便在服务器端筛选数据。
let
Source = GetCDF("/timeseries?externalIdPrefix=EVE&limit=1000")
in
Source
常见的 POST 请求
使用 PostCDF
函数从 CDF 获取数据时,必须编写请求正文以选择要获取的数据。该函数接受 JSON 正文的文本表示形式,但也可以使用 Power Query 记录数据类型编写正文,将其转换为 JSON 文本数据类型后再将该值传递给 PostCDF
函数。
let
SpaceExternalId = "Geography",
ViewExternalId = "City",
ViewVersion = "1",
Body = [
sources = {
[
source = [
type = "view",
space = SpaceExternalId,
externalId = ViewExternalId,
version = ViewVersion
]
]
},
limit = 1000
],
BodyText = Text.FromBinary(Json.FromValue(Body)),
Source = PostCDF("/models/instances/list", BodyText)
in
Source
或者,也可以手动将 POST 正文编写为文本,但需要使用另一组双引号字符来转义双引号 ("
):
let
BodyText = "{""sources"": [{""source"": {""type"": ""view"", ""space"": ""Geography"", ""externalId"": ""City"", ""version"": ""1""}}], ""limit"": 1000}",
Source = PostCDF("/models/instances/list", BodyText)
in
Source
如果需要重复使用 POST 请求,可以将其转换为 Power Query 函数。例如:
(SpaceExternalId as text, ViewExternalId as text, ViewVersion as text) as table =>
let
Body = [
sources = {
[
source = [
type = "view",
space = SpaceExternalId,
externalId = ViewExternalId,
version = ViewVersion
]
]
},
limit = 1000
],
BodyText = Text.FromBinary(Json.FromValue(Body)),
Source = PostCDF("/models/instances/list", BodyText)
in
Source
在 Power Query 编辑器的查询列表中右键单击该条目并选择重命名,即可定义函数名称。
如果上面的函数名为 ListInstancesDMS
,则可以通过在 Power Query 中输入字段值或编写新查询,以便在新查询中使用该函数:
let
Source = ListInstancesDMS("Geography", "City", "1")
in
Source
GraphQL 请求
使用 GraphQL
函数从 CDF 获取数据时,必须编写 GraphQL 请求以选择要从特定数据模型中获取的数据。该函数要求指定 space 的外部 ID、view 的外部 ID、view 的版本、要运行的 GraphQL 查询,以及查询中要使用的一组变量(可选)。
下面的查询使用 GraphQL 语法,并以 JSON 文本形式传递变量。在查询中使用变量可以更轻松地参数化查询并与外部值一起使用。
let
Source = GraphQL(
"cdf_idm",
"CogniteProcessIndustries",
"v1",
"query MyQuery($cursor: String, $endTime: Timestamp) {#(lf) listCogniteMaintenanceOrder(#(lf) first: 1000#(lf) after: $cursor#(lf) filter: {endTime: {gte: $endTime}}#(lf) ) {#(lf) items {#(lf) name#(lf) type#(lf) startTime#(lf) endTime#(lf) priority#(lf) }#(lf) pageInfo {#(lf) endCursor#(lf) hasNextPage#(lf) }#(lf) }#(lf)}",
"{""endTime"": ""2024-10-01T00:00:00+02:00""}"
)
in
Source
Power Query 使用的 M 语言目前不支持多行字符串,因此必须进行单行查询。#(lf)
表示换行符。在上面的示例中,查询被粘贴到 Power Query 的文本区域字段中,变量以 JSON 文本的形式传递。请注意 Power BI 是如何将换行符添加到原始查询中,并将其表示为单行文本变量的。
或者,也可以将查询编写为多个单行文本,然后使用 Text.Combine
函数将换行符添加到查询中。可以将变量定义为 Power Query 记录,将其转换为 JSON 文本后再传递给 GraphQL
函数。例如,请参阅 ConvertDateTimeZoneToIso
函数如何将 datetimezone
变量转换为 ISO 8601 格式的文本表示形式,然后将其作为变量传递给查询。
let
// This could be a parameter or referenced from another query
EndTime = #datetimezone(2024, 10, 1, 0, 0, 0, 2, 0),
VariablesRecord = [
endTime = ConvertDateTimeZoneToIso(EndTime)
],
VariablesText = Text.FromBinary(Json.FromValue(VariablesRecord)),
Query = Text.Combine({
"query MyQuery($cursor: String, $endTime: Timestamp) {",
" listCogniteMaintenanceOrder(",
" first: 1000",
" after: $cursor",
" filter: {endTime: {gte: $endTime}}",
" ) {",
" items {",
" name",
" type",
" startTime",
" endTime",
" priority",
" }",
" pageInfo {",
" endCursor",
" hasNextPage",
" }",
" }",
"}"
}, "#(lf)"),
Data = GraphQL(
"cdf_idm",
"CogniteProcessIndustries",
"v1",
Query,
VariablesText
)
in
Data
高级示例
根据 Cognite API 响应的形状,可能需要额外的 Power Query 转换来获取和转换数据。复制并调整以下示例,以满足你的业务需求。
使用 PostCDF
函数获取序列行
下面的 Power Query 函数从 CDF 获取并处理序列外部 ID 的序列行数据。它发送 POST 请求,提取列信息,扩展嵌套响应数据,并将其重新整理为表格格式。该函数可处理数据类型转换,删除不必要的字段,并按行对数据进行分组。最终输出是一个结构良好的表格,具有正确类型的列。
(externalId as text) as table =>
let
RequestBody = "{""externalId"": """ & externalId & """, ""limit"": 10000}",
Response = PostCDF("/sequences/data/list", RequestBody),
// Extract columns information from the first page
FirstPage = Response{0},
Columns = FirstPage[columns],
ColumnNames = List.Transform(Columns, each [externalId]),
ColumnTypes = List.Transform(Columns, each
if [valueType] = "STRING" then type text else
if [valueType] = "DOUBLE" then type number else
if [valueType] = "LONG" then Int64.Type
else type any
),
// Extract the 'values' from each row
Rows = Table.ExpandListColumn(Response, "rows"),
ValuesTable = Table.ExpandRecordColumn(Rows, "rows", {"rowNumber", "values"}, {"rows.rowNumber", "rows.values"}),
RemoveColumns = Table.RemoveColumns(ValuesTable,{"id", "externalId", "columns", "nextCursor"}),
ExpandValues = Table.ExpandListColumn(RemoveColumns, "rows.values"),
// Group by rowNumber and create a record for each row
GroupedRows = Table.Group(ExpandValues, {"rows.rowNumber"}, {
{"RowData", (t) => Record.FromList(t[rows.values], ColumnNames)}
}),
// Expand the RowData column
ExpandRows = Table.ExpandRecordColumn(GroupedRows, "RowData", ColumnNames),
// Set column data types
FinalTable = Table.TransformColumnTypes(ExpandRows, List.Zip({ColumnNames, ColumnTypes}))
in
FinalTable
要使用该函数:
let
Source = RetrieveSequenceRows("sequence-externalId")
in
Source
使用 PostCDF
函数从 DMS 查询端点获取实例
下面的 Power Query 函数检索并处理 DMS 查询的数据建模实例。它对响应进行分页,提取实例并扩展嵌套数据。
(query as text) as table =>
let
FetchPage = (query as text, optional cursors as nullable record) as table =>
let
Query = Json.Document(query),
UpdatedQuery =
if cursors <> null then
let
// Get all field names of both records
QueryWithFields = Record.FieldNames(Query[with]),
QUerySelectFields = Record.FieldNames(Query[select]),
CursorsFields = Record.FieldNames(cursors),
// Find the intersection of field names
CommonFields = List.Intersect({QueryWithFields, QUerySelectFields, CursorsFields}),
// Create new records containing only the common fields
UpdatedQueryWithAndSelect = Record.TransformFields(
Query,
{
{"with", each Record.SelectFields(_, CommonFields)},
{"select", each Record.SelectFields(_, CommonFields)}
}
)
in
UpdatedQueryWithAndSelect
else
Query,
// Add cursors if they are provided
UpdatedQueryWithCursors =
if cursors <> null then
Record.AddField(UpdatedQuery, "cursors", cursors)
else
UpdatedQuery,
FinalBody = Text.FromBinary(Json.FromValue(UpdatedQueryWithCursors)),
Response = PostCDF("/models/instances/query", FinalBody)
in
Response,
// Helper function to create next cursor record from result table
CreateNextCursorRecordFromTable = (inputTable as table) as record =>
let
RecordsList = List.Transform(
Table.ToRecords(inputTable), each Record.FromList({[nextCursor]}, {[resultExpression]})
),
CombinedRecord = Record.Combine(RecordsList)
in
CombinedRecord,
// Helper function to check if all cursors are null
AllCursorsNull = (cursorsRecord as record) as logical =>
let
CursorValues = Record.ToList(cursorsRecord),
NullCount = List.Count(List.Select(CursorValues, each _ = null))
in
NullCount = List.Count(CursorValues),
// Helper function to aggregate items from all pages and convert to tables
AggregateResults = (results as list) as table =>
let
// Combine all tables
CombinedTable = Table.Combine(results),
// Group by resultExpression and convert items to tables
GroupedTable = Table.Group(
CombinedTable,
{"resultExpression"},
{
{
"items",
each
Table.FromRecords(
List.Combine(List.Transform([items], each if Value.Is(_, type list) then _ else {
_
}))
),
type table
}
}
)
in
GroupedTable,
// Main pagination logic
FetchAllPages = () as list =>
let
// Initialize accumulator
InitialAcc = [
results = {},
currentCursors = null,
hasMore = true
],
// Pagination function
PaginationFunction = (acc as record) =>
let
CurrentPage = FetchPage(query, acc[currentCursors]),
NextCursors = CreateNextCursorRecordFromTable(CurrentPage),
HasMoreResults = not AllCursorsNull(NextCursors) and Table.RowCount(CurrentPage) > 0,
UpdatedResults = List.Combine({acc[results], {CurrentPage}})
in
[
results = UpdatedResults,
currentCursors = NextCursors,
hasMore = HasMoreResults
],
// Keep fetching until no more results
AllResults = List.Generate(
() => InitialAcc, each _[hasMore], each PaginationFunction(_), each _[results]
),
// Get the final list of results
FinalResults = List.Last(AllResults)
in
FinalResults,
// Execute pagination and combine results
AllPages = FetchAllPages(),
FinalTable = AggregateResults(AllPages)
in
FinalTable
要使用该函数:
let
Query = [
with = [
cities = [
nodes = [
filter = [
hasData = {
[
space = "Geography",
externalId = "City",
version = "1",
#"type" = "view"
]
}
],
chainTo = "destination",
direction = "outwards"
]
],
countries = [
nodes = [
filter = [
hasData = {
[
space = "Geography",
externalId = "Country",
version = "1",
#"type" = "view"
]
}
],
chainTo = "destination",
direction = "outwards"
]
]
],
select = [
cities = [
sources = {
[
source = [
space = "Geography",
externalId = "City",
version = "1",
#"type" = "view"
],
properties = {
"name"
}
]
}
],
countries = [
sources = {
[
source = [
space = "Geography",
externalId = "Country",
version = "1",
#"type" = "view"
],
properties = {
"name"
}
]
}
]
]
],
QueryText = Text.FromBinary(Json.FromValue(Query)),
Source = QueryDMS(QueryText)
in
Source
添加筛选器(例如上述示例中的 hasData
筛选器),以避免从 CDF 获取所有实例。
使用 PostCDF
函数获取时间序列数据点
下面的 Power Query 函数可检索并处理某个时间范围内时间序列的聚合时间序列数据点。它将本地时区输入转换为 UTC 以用于 CDF API 请求,支持多种聚合和自定义粒度,并可处理数据分页。然后,该函数将返回的 UTC 时间戳转换回本地时区,扩展嵌套的 API 响应,并输出格式良好且列类型正确的表格。它还包括本地时间戳和十进制聚合值。
该示例使用 ConvertDateTimeZoneToMs
和 ConvertMsToDateTimeZone
函数来转换时间戳。
(
item as record,
start as datetimezone,
optional end as nullable datetimezone,
optional aggregates as nullable text,
optional granularity as nullable text,
optional targetUnit as nullable text,
optional targetUnitSystem as nullable text,
optional timeZone as nullable text
) =>
let
// Function to detect query type based on item record structure
DetectQueryType = (item as record) =>
let
Fields = Record.FieldNames(item),
HasId = List.Contains(Fields, "id"),
HasExternalId = List.Contains(Fields, "externalId"),
HasSpace = List.Contains(Fields, "space"),
FieldCount = List.Count(Fields),
QueryType =
if HasId and not HasExternalId and not HasSpace and FieldCount = 1 then
"id"
else if HasExternalId and not HasId and not HasSpace and FieldCount = 1 then
"externalId"
else if HasExternalId and HasSpace and not HasId and FieldCount = 2 then
"instanceId"
else
Error.Record(
"Invalid item content", "The item record does not match any supported query type", item
)
in
QueryType,
// Detect query type
queryType = DetectQueryType(item),
// Determine limit based on presence of aggregates
limit = if aggregates <> null then 10000 else 100000,
// Convert aggregates from comma-separated string to list format accepted by the API
AggregatesList = Text.Split(aggregates, ","),
AggregatesTrimmedList = List.Transform(AggregatesList, each Text.Trim(_)),
StartMs = Number.Round(ConvertDateTimeZoneToMs(start)),
EndMs = Number.Round(ConvertDateTimeZoneToMs(end)),
// Function to fetch a single page of data
FetchPage = (cursor as nullable text) =>
let
// Build body item
bodyItem =
if queryType = "id" then
[id = Record.Field(item, "id")]
& (if targetUnit <> null then [targetUnit = targetUnit] else [])
& (if targetUnitSystem <> null then [targetUnitSystem = targetUnitSystem] else [])
& (if cursor <> null then [cursor = cursor] else [])
else if queryType = "externalId" then
[externalId = Record.Field(item, "externalId")]
& (if targetUnit <> null then [targetUnit = targetUnit] else [])
& (if targetUnitSystem <> null then [targetUnitSystem = targetUnitSystem] else [])
& (if cursor <> null then [cursor = cursor] else [])
else if queryType = "instanceId" then
[ instanceId = [ space = Record.Field(item, "space"), externalId = Record.Field(item, "externalId") ] ]
& (if targetUnit <> null then [targetUnit = targetUnit] else [])
& (if targetUnitSystem <> null then [targetUnitSystem = targetUnitSystem] else [])
& (if cursor <> null then [cursor = cursor] else [])
else
error "Invalid query type",
// Build request body
body = [
items = {bodyItem},
limit = limit,
ignoreUnknownIds = true,
start = Text.From(StartMs)
]
& (if end <> null then [end = Text.From(EndMs)] else [])
& (if aggregates <> null then [aggregates = AggregatesTrimmedList] else [])
& (if granularity <> null then [granularity = granularity] else [])
& (if timeZone <> null then [timeZone = timeZone] else []),
Response = PostCDF("/timeseries/data/list", Text.FromBinary(Json.FromValue(body))),
// Try to fetch the cursor from the first item in the response
FirstItem =
if Type.Is(Value.Type(Response), type table) and Table.RowCount(Response) > 0 then
Table.First(Response)
else
null,
NextCursor = if FirstItem <> null then Record.FieldOrDefault(FirstItem, "nextCursor", null) else null,
// Handles empty response and extracts data points when present
FinalItemsList =
if Table.HasColumns(Response, "datapoints") then
let
// Clean up the response table
ColumnsToRemove = {"nextCursor", "isStep", "unit"},
ColumnsPresent = List.Intersect({Table.ColumnNames(Response), ColumnsToRemove}),
CleanedTable = Table.RemoveColumns(Response, ColumnsPresent),
// Expand the "datapoints" column
ExpandedDatapointsList = Table.ExpandListColumn(CleanedTable, "datapoints"),
// Handles the case where the list of "datapoints" is empty
FinalDataPointsList =
if List.NonNullCount(ExpandedDatapointsList[datapoints]) > 0 then
let
// Extract a sample record to determine available fields dynamically
SampleRecord = ExpandedDatapointsList[datapoints]{0},
AvailableFields = Record.FieldNames(SampleRecord),
// Expand the "datapoints" records using the available fields
ExpandedDatapointsRecords = Table.ExpandRecordColumn(
ExpandedDatapointsList, "datapoints", AvailableFields, AvailableFields
),
DataPointsList = Table.ToRecords(ExpandedDatapointsRecords)
in
DataPointsList
else
{}
in
FinalDataPointsList
else
Table.ToRecords(Response)
in
{FinalItemsList, NextCursor},
// Recursive function to accumulate all pages of data
AccumulateData = (cursor as nullable text, accumulatedItems as list) =>
let
CurrentPage = FetchPage(cursor),
NewItems = CurrentPage{0},
NextCursor = CurrentPage{1},
UpdatedAccumulatedItems = accumulatedItems & NewItems,
Result =
if NextCursor <> null then
@AccumulateData(NextCursor, UpdatedAccumulatedItems)
else
UpdatedAccumulatedItems
in
Result,
// Fetch all data
AllItems = AccumulateData(null, {}),
// Convert the accumulated items to a table
ConvertToTable =
if List.IsEmpty(AllItems) then
Table.FromList({}, Splitter.SplitByNothing(), null, null, ExtraValues.Error)
else
Table.FromList(AllItems, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
// Expand the table column and convert timestamps
ExpandedTable =
if not Table.IsEmpty(ConvertToTable) and Table.HasColumns(ConvertToTable, "Column1") then
let
TmpTable = Table.ExpandRecordColumn(
ConvertToTable, "Column1", Record.FieldNames(ConvertToTable{0}[Column1])
),
// timestamp should be always present when there are datapoints
FixType = Table.TransformColumnTypes(TmpTable, {{"timestamp", Int64.Type}}),
ParseTimestamp = Table.TransformColumns(FixType, {"timestamp", each ConvertMsToDateTimeZone(_)}),
ParsedWithType = Table.TransformColumnTypes(ParseTimestamp, {{"timestamp", type datetimezone}}),
// check if the timeseries is of type string
FirstEntry = ParsedWithType{0},
IsString = FirstEntry[isString],
CleanedTable = Table.RemoveColumns(ParsedWithType, {"isString"}),
// Convert aggregate/value columns to decimal number
ValuesAsDecimal =
if aggregates <> null then
Table.TransformColumnTypes(
CleanedTable, List.Transform(AggregatesTrimmedList, each {_, type number})
)
else if IsString then
CleanedTable
else
Table.TransformColumnTypes(
CleanedTable, List.Transform({"value"}, each {_, type number})
),
// Check if "id" column is present and convert to integer
IdAsInteger =
if Table.HasColumns(ValuesAsDecimal, "id") then
Table.TransformColumnTypes(ValuesAsDecimal, {{"id", Int64.Type}})
else
ValuesAsDecimal
in
IdAsInteger
else
ConvertToTable
in
ExpandedTable
该函数比前面的示例更为复杂,可处理多种不同的情况,包括分页、数据类型转换和嵌套数据扩展。要使用该函数:
let
Source = RetrieveDataPoints(
[ externalId = "EVE-TI-FORNEBU-01-3" ],
#datetimezone(2024, 10, 1, 0, 0, 0, 2, 0),
#datetimezone(2024, 10, 13, 10, 0, 0, 2, 0),
"average,max,min",
"1d",
null,
"SI",
"Europe/Oslo"
)
in
Source
基于此函数,可以创建另一个函数来迭代时间序列外部 ID 列表,并将结果合并到一个大型表格中。该列表可以是另一个表格中的一列,例如,可以在其中筛选时间序列。可以调整该函数,以迭代内部 ID 或实例 ID 列表。
(
externalIds as list,
start as datetimezone,
end as datetimezone,
aggregates as text,
granularity as text,
optional targetUnitSystem as nullable text,
optional timeZone as nullable text
) =>
let
// Iterate over each externalId and get corresponding table
TablesList = List.Transform(
externalIds,
each RetrieveDataPoints(
[ externalId = _ ],
start,
end,
aggregates,
granularity,
null,
targetUnitSystem,
timeZone
)
),
// Combine all tables into one
CombinedTable = Table.Combine(TablesList)
in
CombinedTable
要使用该函数:
let
Source = RetrieveDataPointsMultipleTs(
{"EVE-TI-FORNEBU-01-2", "EVE-TI-FORNEBU-01-3"},
#datetimezone(2024, 10, 1, 0, 0, 0, 2, 0),
#datetimezone(2024, 10, 13, 10, 0, 0, 2, 0),
"average,max,min",
"1d",
"SI",
"Europe/Oslo"
)
in
Source
- Microsoft:
Power Query文档