小编Gru*_*tak的帖子

使用kafka流获取时间窗口中给定键的最后一个事件

我开始使用 KStream 来使用来自现有主题的数据。

我只对在 10 秒窗口内获取给定 ID 的最后一个事件感兴趣。我尝试使用以下代码:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
Run Code Online (Sandbox Code Playgroud)

但我最终得到了所有事件,而不仅仅是最后一个。使用 KStream 可以做我想做的事吗?

java dsl stream apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1283
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

dsl ×1

java ×1

stream ×1