Power Query 関数とサンプル クエリ
Cognite Data Fusion (REST API) connector for Power BI で公開されている関数を Power Query と組み合わせて、Cognite API でデータを取得および変換し、Microsoft Power BI および Microsoft Excel でレポート ダッシュボードを作成します。
このセクションで説明されている機能は現在、特定のお客様を対象にベータ テスト中であり、変更される可能性があります。
このページのクエリと Power Query 関数をコピーして、ビジネス ニーズに合わせて調整してください。
Utility 関数
エポックとの間でのタイムスタンプの変換
CDF リソース タイプは、_Unix エポックからのミリ秒単位_のタイムスタンプの使用を期待し、返します。Power Query には、この形式をタイムゾーン対応のタイムスタンプを表す datetimezone
タイプに自動的に解析する方法がありません。CDF データ モデルは、プリミティブ フィールドのタイムスタンプを ISO 8601 形式で表します。
以下の関数を使用して、datetimezone
変数と _Unix エポックからのミリ秒_との間、および datetimezone
変数から ISO 8601 形式のテキストへの変換を行います。
ConvertDateTimeZoneToMs
//
(dtz as datetimezone) as number =>
let
// Convert the input DateTimeZone to UTC
UtcDateTime = DateTimeZone.RemoveZone(DateTimeZone.SwitchZone(dtz, 0)),
// Define the Unix epoch start
UnixEpochStart = #datetime(1970, 1, 1, 0, 0, 0),
// Calculate the duration between the input date and Unix epoch start
Delta = UtcDateTime - UnixEpochStart,
// Convert duration to total milliseconds
TotalMilliseconds = Duration.TotalSeconds(Delta) * 1000
in
TotalMilliseconds
ConvertMsToDateTimeZone
(ms as number) as datetimezone =>
let
// Convert ms to seconds
SecondsSinceEpoch = ms / 1000,
// Create a duration
DurationSinceEpoch = #duration(0, 0, 0, SecondsSinceEpoch),
// Add duration to Unix epoch start to get UTC datetime
UnixEpochStart = #datetime(1970, 1, 1, 0, 0, 0),
UtcDateTime = UnixEpochStart + DurationSinceEpoch,
// Convert UTC datetime to local time zone
LocalDateTimeZone = DateTimeZone.From(UtcDateTime)
in
LocalDateTimeZone
ConvertDateTimeZoneToIso
(dtz as datetimezone) as text =>
let
// Use DateTimeZone.ToText with ISO 8601 format
Result = DateTimeZone.ToText(dtz, [Format="yyyy-MM-ddTHH:mm:sszzz", Culture="en-US"])
in
Result
関数の追加
Power Query に新しい関数を追加するには、Get Data > Blank Query を選択し、関数を記述するか、以下の関数のいずれかをコピーします。
時間差
通常は、時間差に基づいて開始と終了のタイムスタンプを定義します。データセットが更新されると、値も更新されます。 以下の例では、EndTime
は現在の時刻、StartTime
は EndTime
の 7 日前です。異なる時間差を使用するようにこの例を調整できます。
CurrentTime = DateTimeZone.LocalNow(),
EndTime = CurrentTime,
StartTime = CurrentTime - #duration(7, 0, 0, 0)
一般的な GET リクエスト
CDF からデータを取得する際に、CDF REST API connector for Power BI の GetCDF
] 関数を使用する場合は、取得するデータを指定するフィルターを渡すためにクエリ パラメータを使用する必要があります。
下の例では、サーバー側でデータをフィルタリングするために、URL に /timeseries
エンドポイントから externalIdPrefix
および limit
クエリ パラメータを追加する方法を示しています。
let
Source = GetCDF("/timeseries?externalIdPrefix=EVE&limit=1000")
in
Source
一般的な POST リクエスト
PostCDF
] 関数で CDF からデータを取得する際には、取得するデータを指定するリクエスト ボディを記述する必要があります。関数は JSON ボディのテキスト表現を受け入れますが、Power Query レコード データ型を使用してボディを記述し、その値を PostCDF
関数に渡す前に JSON テキスト データ型に変換することもできます。
let
SpaceExternalId = "Geography",
ViewExternalId = "City",
ViewVersion = "1",
Body = [
sources = {
[
source = [
type = "view",
space = SpaceExternalId,
externalId = ViewExternalId,
version = ViewVersion
]
]
},
limit = 1000
],
BodyText = Text.FromBinary(Json.FromValue(Body)),
Source = PostCDF("/models/instances/list", BodyText)
in
Source
または、POST ボディを手動でテキストとして記述することもできますが、二重引用符("
)をもう一組の二重引用符文字でエスケープする必要があります。
let
BodyText = "{""sources"": [{""source"": {""type"": ""view"", ""space"": ""Geography"", ""externalId"": ""City"", ""version"": ""1""}}], ""limit"": 1000}",
Source = PostCDF("/models/instances/list", BodyText)
in
Source
POST リクエストを再利用する必要がある場合は、それを Power Query 関数に変換できます。例:
(SpaceExternalId as text, ViewExternalId as text, ViewVersion as text) as table =>
let
Body = [
sources = {
[
source = [
type = "view",
space = SpaceExternalId,
externalId = ViewExternalId,
version = ViewVersion
]
]
},
limit = 1000
],
BodyText = Text.FromBinary(Json.FromValue(Body)),
Source = PostCDF("/models/instances/list", BodyText)
in
Source
Power Query エディターのクエリ リストのエントリを右クリックして Rename を選択することで、関数名を定義できます。
上記の関数が ListInstancesDMS
という名前の場合、Power Query のフィールド値を入力するか、新しいクエリを記述することで、新しいクエリでそれを使用できます。
let
Source = ListInstancesDMS("Geography", "City", "1")
in
Source
GraphQL リクエスト
GraphQL
] 関数を使用して CDF からデータを取得する際には、特定のデータ モデルから取得するデータを指定する GraphQL リクエストを記述する必要があります。この関数では、space の外部 ID、view の外部 ID、view のバージョン、実行する GraphQL クエリ、およびオプションとしてクエリで使用する変数のセットを指定する必要があります。
下のクエリは GraphQL 構文を使用し、変数を JSON テキストとして渡します。クエリに変数を使用すると、外部の値でパラメータ化しやすくなり、使用しやすくなります。
let
Source = GraphQL(
"cdf_idm",
"CogniteProcessIndustries",
"v1",
"query MyQuery($cursor: String, $endTime: Timestamp) {#(lf) listCogniteMaintenanceOrder(#(lf) first: 1000#(lf) after: $cursor#(lf) filter: {endTime: {gte: $endTime}}#(lf) ) {#(lf) items {#(lf) name#(lf) type#(lf) startTime#(lf) endTime#(lf) priority#(lf) }#(lf) pageInfo {#(lf) endCursor#(lf) hasNextPage#(lf) }#(lf) }#(lf)}",
"{""endTime"": ""2024-10-01T00:00:00+02:00""}"
)
in
Source
Power Query で使用されている M 言語は現在、複数行の文字列をサポートしていないため、クエリは 1 行にする必要があります。#(lf)
は改行文字を表します。上記の例では、クエリは Power Query のテキスト領域フィールドに貼り付けられ、変数は JSON テキストとして渡されました。 Power BI が元のクエリに改行を追加し、それを 1 行のテキスト変数として表現している点に注目してください。
あるいは、クエリを複数の 1 行テキストとして記述し、Text.Combine
関数を使用してクエリに改行を追加することもできます。 変数を Power Query レコードとして定義し、JSON テキストに変換してから GraphQL
関数に渡すこともできます。たとえば、ConvertDateTimeZoneToIso
] 関数がどのように datetimezone
変数を ISO 8601 形式のテキスト表現に変換し、それを変数としてクエリに渡すかを確認してください。
let
// This could be a parameter or referenced from another query
EndTime = #datetimezone(2024, 10, 1, 0, 0, 0, 2, 0),
VariablesRecord = [
endTime = ConvertDateTimeZoneToIso(EndTime)
],
VariablesText = Text.FromBinary(Json.FromValue(VariablesRecord)),
Query = Text.Combine({
"query MyQuery($cursor: String, $endTime: Timestamp) {",
" listCogniteMaintenanceOrder(",
" first: 1000",
" after: $cursor",
" filter: {endTime: {gte: $endTime}}",
" ) {",
" items {",
" name",
" type",
" startTime",
" endTime",
" priority",
" }",
" pageInfo {",
" endCursor",
" hasNextPage",
" }",
" }",
"}"
}, "#(lf)"),
Data = GraphQL(
"cdf_idm",
"CogniteProcessIndustries",
"v1",
Query,
VariablesText
)
in
Data
高度な例
Cognite API レスポンスの形によっては、データを取得し変換するために、追加の Power Query 変換が必要になる場合があります。 以下の例をコピーして、ビジネス ニーズに合わせて変更してください。
PostCDF
関数でシーケンス行を取得
下の Power Query 関数は、CDF からシーケンス外部 ID のシーケンス行データを取得し、処理します。POST リクエストを送信し、列情報を抽出し、ネストされたレスポンス データを展開し、表形式に再構成します。この関数は、データ型の変換、不要なフィールドの削除、行ごとのデータのグループ化を行います。最終出力は、適切なデータ型で列が設定された、整然としたテーブルです。
(externalId as text) as table =>
let
RequestBody = "{""externalId"": """ & externalId & """, ""limit"": 10000}",
Response = PostCDF("/sequences/data/list", RequestBody),
// Extract columns information from the first page
FirstPage = Response{0},
Columns = FirstPage[columns],
ColumnNames = List.Transform(Columns, each [externalId]),
ColumnTypes = List.Transform(Columns, each
if [valueType] = "STRING" then type text else
if [valueType] = "DOUBLE" then type number else
if [valueType] = "LONG" then Int64.Type
else type any
),
// Extract the 'values' from each row
Rows = Table.ExpandListColumn(Response, "rows"),
ValuesTable = Table.ExpandRecordColumn(Rows, "rows", {"rowNumber", "values"}, {"rows.rowNumber", "rows.values"}),
RemoveColumns = Table.RemoveColumns(ValuesTable,{"id", "externalId", "columns", "nextCursor"}),
ExpandValues = Table.ExpandListColumn(RemoveColumns, "rows.values"),
// Group by rowNumber and create a record for each row
GroupedRows = Table.Group(ExpandValues, {"rows.rowNumber"}, {
{"RowData", (t) => Record.FromList(t[rows.values], ColumnNames)}
}),
// Expand the RowData column
ExpandRows = Table.ExpandRecordColumn(GroupedRows, "RowData", ColumnNames),
// Set column data types
FinalTable = Table.TransformColumnTypes(ExpandRows, List.Zip({ColumnNames, ColumnTypes}))
in
FinalTable
この関数を使用するには、以下を参考にしてください。
let
Source = RetrieveSequenceRows("sequence-externalId")
in
Source
PostCDF
関数で、DMS クエリ エンドポイントからインスタンスを取得します
下の Power Query 関数は、DMS クエリ用のデータ モデリング インスタンスを取得し、処理します。レスポンスをページングし、インスタンスを抽出し、ネストされたデータを展開します。
(query as text) as table =>
let
FetchPage = (query as text, optional cursors as nullable record) as table =>
let
Query = Json.Document(query),
UpdatedQuery =
if cursors <> null then
let
// Get all field names of both records
QueryWithFields = Record.FieldNames(Query[with]),
QUerySelectFields = Record.FieldNames(Query[select]),
CursorsFields = Record.FieldNames(cursors),
// Find the intersection of field names
CommonFields = List.Intersect({QueryWithFields, QUerySelectFields, CursorsFields}),
// Create new records containing only the common fields
UpdatedQueryWithAndSelect = Record.TransformFields(
Query,
{
{"with", each Record.SelectFields(_, CommonFields)},
{"select", each Record.SelectFields(_, CommonFields)}
}
)
in
UpdatedQueryWithAndSelect
else
Query,
// Add cursors if they are provided
UpdatedQueryWithCursors =
if cursors <> null then
Record.AddField(UpdatedQuery, "cursors", cursors)
else
UpdatedQuery,
FinalBody = Text.FromBinary(Json.FromValue(UpdatedQueryWithCursors)),
Response = PostCDF("/models/instances/query", FinalBody)
in
Response,
// Helper function to create next cursor record from result table
CreateNextCursorRecordFromTable = (inputTable as table) as record =>
let
RecordsList = List.Transform(
Table.ToRecords(inputTable), each Record.FromList({[nextCursor]}, {[resultExpression]})
),
CombinedRecord = Record.Combine(RecordsList)
in
CombinedRecord,
// Helper function to check if all cursors are null
AllCursorsNull = (cursorsRecord as record) as logical =>
let
CursorValues = Record.ToList(cursorsRecord),
NullCount = List.Count(List.Select(CursorValues, each _ = null))
in
NullCount = List.Count(CursorValues),
// Helper function to aggregate items from all pages and convert to tables
AggregateResults = (results as list) as table =>
let
// Combine all tables
CombinedTable = Table.Combine(results),
// Group by resultExpression and convert items to tables
GroupedTable = Table.Group(
CombinedTable,
{"resultExpression"},
{
{
"items",
each
Table.FromRecords(
List.Combine(List.Transform([items], each if Value.Is(_, type list) then _ else {
_
}))
),
type table
}
}
)
in
GroupedTable,
// Main pagination logic
FetchAllPages = () as list =>
let
// Initialize accumulator
InitialAcc = [
results = {},
currentCursors = null,
hasMore = true
],
// Pagination function
PaginationFunction = (acc as record) =>
let
CurrentPage = FetchPage(query, acc[currentCursors]),
NextCursors = CreateNextCursorRecordFromTable(CurrentPage),
HasMoreResults = not AllCursorsNull(NextCursors) and Table.RowCount(CurrentPage) > 0,
UpdatedResults = List.Combine({acc[results], {CurrentPage}})
in
[
results = UpdatedResults,
currentCursors = NextCursors,
hasMore = HasMoreResults
],
// Keep fetching until no more results
AllResults = List.Generate(
() => InitialAcc, each _[hasMore], each PaginationFunction(_), each _[results]
),
// Get the final list of results
FinalResults = List.Last(AllResults)
in
FinalResults,
// Execute pagination and combine results
AllPages = FetchAllPages(),
FinalTable = AggregateResults(AllPages)
in
FinalTable
この関数を使用するには、以下を参考にしてください。
let
Query = [
with = [
cities = [
nodes = [
filter = [
hasData = {
[
space = "Geography",
externalId = "City",
version = "1",
#"type" = "view"
]
}
],
chainTo = "destination",
direction = "outwards"
]
],
countries = [
nodes = [
filter = [
hasData = {
[
space = "Geography",
externalId = "Country",
version = "1",
#"type" = "view"
]
}
],
chainTo = "destination",
direction = "outwards"
]
]
],
select = [
cities = [
sources = {
[
source = [
space = "Geography",
externalId = "City",
version = "1",
#"type" = "view"
],
properties = {
"name"
}
]
}
],
countries = [
sources = {
[
source = [
space = "Geography",
externalId = "Country",
version = "1",
#"type" = "view"
],
properties = {
"name"
}
]
}
]
]
],
QueryText = Text.FromBinary(Json.FromValue(Query)),
Source = QueryDMS(QueryText)
in
Source
上の例の hasData
フィルターのようなフィルターを追加して、CDF からすべてのインスタンスを取得しないようにします。
PostCDF
関数で時系列データ ポイントを取得
下の Power Query 関数は、時間範囲内の時系列データポイントを集計し、処理します。CDF API リクエストのローカル タイムゾーン入力を UTC に変換し、複数の集計とカスタム粒度をサポートし、データのページングを処理します。関数は、返された UTC タイムスタンプをローカル タイムゾーンに変換し、ネストされた API レスポンスを展開し、適切なデータ型で列が設定され、適切にフォーマットされたテーブルを出力します。 また、ローカル タイムスタンプと小数点以下の集計値も含まれます。
この例では、タイムスタンプの変換に ConvertDateTimeZoneToMs
および ConvertMsToDateTimeZone
関数を使用しています。
(
item as record,
start as datetimezone,
optional end as nullable datetimezone,
optional aggregates as nullable text,
optional granularity as nullable text,
optional targetUnit as nullable text,
optional targetUnitSystem as nullable text,
optional timeZone as nullable text
) =>
let
// Function to detect query type based on item record structure
DetectQueryType = (item as record) =>
let
Fields = Record.FieldNames(item),
HasId = List.Contains(Fields, "id"),
HasExternalId = List.Contains(Fields, "externalId"),
HasSpace = List.Contains(Fields, "space"),
FieldCount = List.Count(Fields),
QueryType =
if HasId and not HasExternalId and not HasSpace and FieldCount = 1 then
"id"
else if HasExternalId and not HasId and not HasSpace and FieldCount = 1 then
"externalId"
else if HasExternalId and HasSpace and not HasId and FieldCount = 2 then
"instanceId"
else
Error.Record(
"Invalid item content", "The item record does not match any supported query type", item
)
in
QueryType,
// Detect query type
queryType = DetectQueryType(item),
// Determine limit based on presence of aggregates
limit = if aggregates <> null then 10000 else 100000,
// Convert aggregates from comma-separated string to list format accepted by the API
AggregatesList = Text.Split(aggregates, ","),
AggregatesTrimmedList = List.Transform(AggregatesList, each Text.Trim(_)),
StartMs = Number.Round(ConvertDateTimeZoneToMs(start)),
EndMs = Number.Round(ConvertDateTimeZoneToMs(end)),
// Function to fetch a single page of data
FetchPage = (cursor as nullable text) =>
let
// Build body item
bodyItem =
if queryType = "id" then
[id = Record.Field(item, "id")]
& (if targetUnit <> null then [targetUnit = targetUnit] else [])
& (if targetUnitSystem <> null then [targetUnitSystem = targetUnitSystem] else [])
& (if cursor <> null then [cursor = cursor] else [])
else if queryType = "externalId" then
[externalId = Record.Field(item, "externalId")]
& (if targetUnit <> null then [targetUnit = targetUnit] else [])
& (if targetUnitSystem <> null then [targetUnitSystem = targetUnitSystem] else [])
& (if cursor <> null then [cursor = cursor] else [])
else if queryType = "instanceId" then
[ instanceId = [ space = Record.Field(item, "space"), externalId = Record.Field(item, "externalId") ] ]
& (if targetUnit <> null then [targetUnit = targetUnit] else [])
& (if targetUnitSystem <> null then [targetUnitSystem = targetUnitSystem] else [])
& (if cursor <> null then [cursor = cursor] else [])
else
error "Invalid query type",
// Build request body
body = [
items = {bodyItem},
limit = limit,
ignoreUnknownIds = true,
start = Text.From(StartMs)
]
& (if end <> null then [end = Text.From(EndMs)] else [])
& (if aggregates <> null then [aggregates = AggregatesTrimmedList] else [])
& (if granularity <> null then [granularity = granularity] else [])
& (if timeZone <> null then [timeZone = timeZone] else []),
Response = PostCDF("/timeseries/data/list", Text.FromBinary(Json.FromValue(body))),
// Try to fetch the cursor from the first item in the response
FirstItem =
if Type.Is(Value.Type(Response), type table) and Table.RowCount(Response) > 0 then
Table.First(Response)
else
null,
NextCursor = if FirstItem <> null then Record.FieldOrDefault(FirstItem, "nextCursor", null) else null,
// Handles empty response and extracts data points when present
FinalItemsList =
if Table.HasColumns(Response, "datapoints") then
let
// Clean up the response table
ColumnsToRemove = {"nextCursor", "isStep", "unit"},
ColumnsPresent = List.Intersect({Table.ColumnNames(Response), ColumnsToRemove}),
CleanedTable = Table.RemoveColumns(Response, ColumnsPresent),
// Expand the "datapoints" column
ExpandedDatapointsList = Table.ExpandListColumn(CleanedTable, "datapoints"),
// Handles the case where the list of "datapoints" is empty
FinalDataPointsList =
if List.NonNullCount(ExpandedDatapointsList[datapoints]) > 0 then
let
// Extract a sample record to determine available fields dynamically
SampleRecord = ExpandedDatapointsList[datapoints]{0},
AvailableFields = Record.FieldNames(SampleRecord),
// Expand the "datapoints" records using the available fields
ExpandedDatapointsRecords = Table.ExpandRecordColumn(
ExpandedDatapointsList, "datapoints", AvailableFields, AvailableFields
),
DataPointsList = Table.ToRecords(ExpandedDatapointsRecords)
in
DataPointsList
else
{}
in
FinalDataPointsList
else
Table.ToRecords(Response)
in
{FinalItemsList, NextCursor},
// Recursive function to accumulate all pages of data
AccumulateData = (cursor as nullable text, accumulatedItems as list) =>
let
CurrentPage = FetchPage(cursor),
NewItems = CurrentPage{0},
NextCursor = CurrentPage{1},
UpdatedAccumulatedItems = accumulatedItems & NewItems,
Result =
if NextCursor <> null then
@AccumulateData(NextCursor, UpdatedAccumulatedItems)
else
UpdatedAccumulatedItems
in
Result,
// Fetch all data
AllItems = AccumulateData(null, {}),
// Convert the accumulated items to a table
ConvertToTable =
if List.IsEmpty(AllItems) then
Table.FromList({}, Splitter.SplitByNothing(), null, null, ExtraValues.Error)
else
Table.FromList(AllItems, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
// Expand the table column and convert timestamps
ExpandedTable =
if not Table.IsEmpty(ConvertToTable) and Table.HasColumns(ConvertToTable, "Column1") then
let
TmpTable = Table.ExpandRecordColumn(
ConvertToTable, "Column1", Record.FieldNames(ConvertToTable{0}[Column1])
),
// timestamp should be always present when there are datapoints
FixType = Table.TransformColumnTypes(TmpTable, {{"timestamp", Int64.Type}}),
ParseTimestamp = Table.TransformColumns(FixType, {"timestamp", each ConvertMsToDateTimeZone(_)}),
ParsedWithType = Table.TransformColumnTypes(ParseTimestamp, {{"timestamp", type datetimezone}}),
// check if the timeseries is of type string
FirstEntry = ParsedWithType{0},
IsString = FirstEntry[isString],
CleanedTable = Table.RemoveColumns(ParsedWithType, {"isString"}),
// Convert aggregate/value columns to decimal number
ValuesAsDecimal =
if aggregates <> null then
Table.TransformColumnTypes(
CleanedTable, List.Transform(AggregatesTrimmedList, each {_, type number})
)
else if IsString then
CleanedTable
else
Table.TransformColumnTypes(
CleanedTable, List.Transform({"value"}, each {_, type number})
),
// Check if "id" column is present and convert to integer
IdAsInteger =
if Table.HasColumns(ValuesAsDecimal, "id") then
Table.TransformColumnTypes(ValuesAsDecimal, {{"id", Int64.Type}})
else
ValuesAsDecimal
in
IdAsInteger
else
ConvertToTable
in
ExpandedTable
この関数は、これまでの例よりも複雑で、ページング、データ型変換、ネストされたデータの展開など、さまざまなシナリオに対応します。 この関数を使用するには、以下を参考にしてください。
let
Source = RetrieveDataPoints(
[ externalId = "EVE-TI-FORNEBU-01-3" ],
#datetimezone(2024, 10, 1, 0, 0, 0, 2, 0),
#datetimezone(2024, 10, 13, 10, 0, 0, 2, 0),
"average,max,min",
"1d",
null,
"SI",
"Europe/Oslo"
)
in
Source
この関数に基づいて、時系列外部 ID のリストを反復処理し、結果を大きなテーブルに結合する別の関数を作成することができます。このリストは、時系列のフィルタリングなどを行う別のテーブルの列とすることができます。この関数を、内部 ID またはインスタンス ID のリストを反復処理するように調整することもできます。
(
externalIds as list,
start as datetimezone,
end as datetimezone,
aggregates as text,
granularity as text,
optional targetUnitSystem as nullable text,
optional timeZone as nullable text
) =>
let
// Iterate over each externalId and get corresponding table
TablesList = List.Transform(
externalIds,
each RetrieveDataPoints(
[ externalId = _ ],
start,
end,
aggregates,
granularity,
null,
targetUnitSystem,
timeZone
)
),
// Combine all tables into one
CombinedTable = Table.Combine(TablesList)
in
CombinedTable
この関数を使用するには、以下を参考にしてください。
let
Source = RetrieveDataPointsMultipleTs(
{"EVE-TI-FORNEBU-01-2", "EVE-TI-FORNEBU-01-3"},
#datetimezone(2024, 10, 1, 0, 0, 0, 2, 0),
#datetimezone(2024, 10, 13, 10, 0, 0, 2, 0),
"average,max,min",
"1d",
"SI",
"Europe/Oslo"
)
in
Source
- Microsoft:
Power Queryドキュメント