liv*_*ves 2 apache-kafka kafka-consumer-api apache-kafka-streams confluent-platform
是否可以合并kafka中的记录并将输出发布到不同的流?
例如,有一个针对 kafka 主题的事件流,如下所示
{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2,startTime:0912}, {txnId:3,开始时间:0925}......
我想通过 txnId 合并这些事件并创建合并的输出,如下所示
{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}
请注意,传入事件中不会维护顺序。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并
我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。
任何想法都受到高度赞赏。
小智 5
您可以尝试通过将开始和结束事件分成 2 个单独的流(以 txnId 作为键)然后加入两个流来解决此问题。
KStream<String, String> eventSource = new StreamBuilder().stream("INPUT-TOPIC");
KStream<String, JsonNode>[] splitEvents =
eventSource.map((key, eventString) -> {
JsonNode event = new ObjectMapper().readTree(eventString);
String txnId = event.path("txnId").asText();
return KeyValue.pair(txnId, event);
})
.branch((key, event) -> event.findValue("startTime") != null,
(key, event) -> event.findValue("endTime") != null);
KStream<String, JsonNode> startEvents = splitEvents[0];
KStream<String, JsonNode> endEvents = splitEvents[1];
Run Code Online (Sandbox Code Playgroud)
当连接的任一侧都有事件时,如图所示的 2 个流之间的连接将产生连接结果。因此,两个事件的顺序并不重要(您必须确保为连接设置适当的窗口期)。
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
KStream<String, String> completeEvents = startEvents.join(endEvents,
(startEvent, endEvent) -> {
// Add logic to merge startEvent and endEvent as seen fit
ObjectNode completeEvent = JsonNodeFactory.instance.objectNode();
completeEvent.put("startTime", startEvent.path("startTime).asText());
completeEvent.put("endTime", endEvent.path("endTime").asText());
return completeEvent.toString();
},
JoinWindows.of(Duration.ofMinutes(15)),
Joined.with(
Serdes.String(), // key
jsonSerde, // left object
jsonSerde // right object
)
);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2852 次 |
| 最近记录: |