Kafka 流 - KSQL - 拆分消息并发布到另一个主题

so-*_*ude 4 apache-kafka ksqldb

有没有办法使用 KSQL 将一条消息拆分为多条消息并发布到新主题。需要明确的是,我并不是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题;而是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题。相反,我正在寻找一个可以为我做到这一点的 KSQL。

例如:

比方说,我需要将invoice主题中的消息拆分为item_inventory_delta消息

发票主题

:销售支票号码

消息示例:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}
Run Code Online (Sandbox Code Playgroud)

item_inventory_delta主题

key : saleschecknumber_itemID

消息示例

1.

{
    "itemId": 123,
    "quantity": 1
}
Run Code Online (Sandbox Code Playgroud)

2.

{
    "itemId": 345,
    "quantity": 5
}
Run Code Online (Sandbox Code Playgroud)

Rob*_*att 9

从 ksqlDB 0.6 开始,由于添加EXPLODEtable 函数,您现在可以执行此操作。

给定一个invoice带有 JSON 负载的主题,首先使用PRINT转储其内容来检查该主题:

ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
Run Code Online (Sandbox Code Playgroud)

然后声明一个关于该主题的模式,这为我们提供了一个 ksqlDB

CREATE STREAM INVOICE (total DOUBLE, 
                       salecounter INT, 
                       items ARRAY<STRUCT<itemId INT, 
                                          quantity INT>>) 
                WITH (KAFKA_TOPIC='invoice', 
                      VALUE_FORMAT='JSON');
Run Code Online (Sandbox Code Playgroud)

这只是“注册”现有主题以与 ksqlDB 一起使用。在下一步之前不会编写新的 Kafka 主题。

创建一个新的 Kafka 主题,不断地从源流中到达的消息填充:

CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
  SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
         EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
    FROM INVOICE;
Run Code Online (Sandbox Code Playgroud)

新主题已创建:

ksql> SHOW TOPICS;

 Kafka Topic                     | Partitions | Partition Replicas
-------------------------------------------------------------------
 invoice                         | 1          | 1
 item_inventory_delta            | 1          | 1
Run Code Online (Sandbox Code Playgroud)

主题有请求的增量消息:)

ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}
Run Code Online (Sandbox Code Playgroud)