so-*_*ude 4 apache-kafka ksqldb
有没有办法使用 KSQL 将一条消息拆分为多条消息并发布到新主题。需要明确的是,我并不是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题;而是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题。相反,我正在寻找一个可以为我做到这一点的 KSQL。
例如:
比方说,我需要将invoice主题中的消息拆分为item_inventory_delta消息
键:销售支票号码
消息示例:
{
"total": 12.33,
"salecounter": 1,
"items": [
{
"itemId": 123,
"quantity": 1
},
{
"itemId": 345,
"quantity": 5
}
]
}
Run Code Online (Sandbox Code Playgroud)
key : saleschecknumber_itemID
消息示例
{
"itemId": 123,
"quantity": 1
}
Run Code Online (Sandbox Code Playgroud)
2.
{
"itemId": 345,
"quantity": 5
}
Run Code Online (Sandbox Code Playgroud)
从 ksqlDB 0.6 开始,由于添加了EXPLODEtable 函数,您现在可以执行此操作。
给定一个invoice带有 JSON 负载的主题,首先使用PRINT转储其内容来检查该主题:
ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
Run Code Online (Sandbox Code Playgroud)
然后声明一个关于该主题的模式,这为我们提供了一个 ksqlDB流:
CREATE STREAM INVOICE (total DOUBLE,
salecounter INT,
items ARRAY<STRUCT<itemId INT,
quantity INT>>)
WITH (KAFKA_TOPIC='invoice',
VALUE_FORMAT='JSON');
Run Code Online (Sandbox Code Playgroud)
这只是“注册”现有主题以与 ksqlDB 一起使用。在下一步之前不会编写新的 Kafka 主题。
创建一个新的 Kafka 主题,不断地从源流中到达的消息填充:
CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS
SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID,
EXPLODE(ITEMS)->QUANTITY AS QUANTITY
FROM INVOICE;
Run Code Online (Sandbox Code Playgroud)
新主题已创建:
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------
invoice | 1 | 1
item_inventory_delta | 1 | 1
Run Code Online (Sandbox Code Playgroud)
主题有请求的增量消息:)
ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3920 次 |
| 最近记录: |