Ult*_*ine 3 apache-kafka ksqldb
我正在尝试加入 KSQL 中的两个现有 Kafka 主题。Kafka的一些数据样本(实际值因企业环境有所删减):
设备主题:
{
"persistTime" : "2020-10-06T13:30:25.373Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}
Run Code Online (Sandbox Code Playgroud)
设备事件主题(第一个示例):
{
"device" : "REDACTED",
"event" : "403151",
"firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-09-30T11:03:50.000Z",
"persistTime" : "2020-09-30T14:32:59.580Z",
"state" : "open",
"context" : {
"2" : "25",
"3" : "0",
"4" : "60",
"1" : "REDACTED"
}
}
Run Code Online (Sandbox Code Playgroud)
设备事件主题(第二个示例):
{
"device" : "REDACTED",
"event" : "402004",
"firstOccurrenceTime" : "2020-10-07T07:02:48Z",
"lastOccurrenceTime" : "2020-10-07T07:02:48Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-10-07T07:02:48Z",
"persistTime" : "2020-10-07T07:15:28.533Z",
"state" : "open",
"context" : {
"2" : "2020-10-07T07:02:48.0000000Z",
"1" : "REDACTED"
}
}
Run Code Online (Sandbox Code Playgroud)
我面临的问题是context设备事件主题下的变量数量不同。
我已尝试使用以下语句在 ksqlDB 上创建事件流:
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
Run Code Online (Sandbox Code Playgroud)
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');
Run Code Online (Sandbox Code Playgroud)
第二条语句仅引入包含所有四个上下文变量(“1”、“2”、“3”和“4”)的数据。
如何为设备事件 Kafka 主题创建 KSQL 等效流?
您需要使用 aMAP而不是 a STRUCT。
顺便说一句,您也不再需要\行分隔符了:)
这是使用 ksqlDB 0.12 的工作示例。
将示例数据加载到主题中
kafkacat -b localhost:9092 -P -t events <<EOF
{ "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
{ "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
EOF
Run Code Online (Sandbox Code Playgroud)
在 ksqlDB 中,声明流:
CREATE STREAM "events" (
"device" VARCHAR,
"event" VARCHAR,
"firstOccurenceTime" VARCHAR,
"lastOccurenceTime" VARCHAR,
"occurenceCount" INTEGER,
"receiveTime" VARCHAR,
"persistTime" VARCHAR,
"state" VARCHAR,
"context" MAP < VARCHAR, VARCHAR >
) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
Run Code Online (Sandbox Code Playgroud)
查询流以检查是否正常:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
+----------+--------+--------------------------+--------+------------------------------------+
|device |event |receiveTime |state |context |
+----------+--------+--------------------------+--------+------------------------------------+
|REDACTED |403151 |2020-09-30T11:03:50.000Z |open |{1=REDACTED, 2=25, 3=0, 4=60} |
|REDACTED |402004 |2020-10-07T07:02:48Z |open |{1=REDACTED, 2=2020-10-07T07:02:48.0|
| | | | |000000Z} |
Run Code Online (Sandbox Code Playgroud)
使用['']语法访问映射中的特定键:
ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
+-----------+--------+------------------------------------+-----------+-----------+
|device |event |context |CONTEXT_1 |CONTEXT_3 |
+-----------+--------+------------------------------------+-----------+-----------+
|REDACTED |403151 |{1=REDACTED, 2=25, 3=0, 4=60} |REDACTED |0 |
|REDACTED |402004 |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED |null |
| | |000000Z} | | |
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3319 次 |
| 最近记录: |