合并kafka流中的记录

liv*_*ves 2 apache-kafka kafka-consumer-api apache-kafka-streams confluent-platform

是否可以合并kafka中的记录并将输出发布到不同的流?

例如,有一个针对 kafka 主题的事件流,如下所示

{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2,startTime:0912}, {txnId:3,开始时间:0925}......

我想通过 txnId 合并这些事件并创建合并的输出,如下所示

{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}

请注意,传入事件中不会维护顺序。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并

我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。

任何想法都受到高度赞赏。

小智 5

您可以尝试通过将开始和结束事件分成 2 个单独的流(以 txnId 作为键)然后加入两个流来解决此问题。

KStream<String, String> eventSource = new StreamBuilder().stream("INPUT-TOPIC");

KStream<String, JsonNode>[] splitEvents = 
          eventSource.map((key, eventString) -> {
                           JsonNode event = new ObjectMapper().readTree(eventString);
                           String txnId = event.path("txnId").asText();
                           return KeyValue.pair(txnId, event);
                        })
                     .branch((key, event) -> event.findValue("startTime") != null,
                             (key, event) -> event.findValue("endTime") != null); 


KStream<String, JsonNode> startEvents = splitEvents[0];
KStream<String, JsonNode> endEvents = splitEvents[1];
Run Code Online (Sandbox Code Playgroud)

当连接的任一侧都有事件时,如图所示的 2 个流之间的连接将产生连接结果。因此,两个事件的顺序并不重要(您必须确保为连接设置适当的窗口期)。

Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());

KStream<String, String> completeEvents = startEvents.join(endEvents, 
               (startEvent, endEvent) -> {
                    // Add logic to merge startEvent and endEvent as seen fit
                       ObjectNode completeEvent = JsonNodeFactory.instance.objectNode();
                       completeEvent.put("startTime",  startEvent.path("startTime).asText());
                       completeEvent.put("endTime",  endEvent.path("endTime").asText());
                       return completeEvent.toString();
                },
               JoinWindows.of(Duration.ofMinutes(15)),
               Joined.with(
                    Serdes.String(),   // key
                    jsonSerde,         // left object
                    jsonSerde          // right object
               )
          );
Run Code Online (Sandbox Code Playgroud)