相关疑难解决方法(0)

如何发送时间窗口KTable的最终kafka-streams聚合结果?

我想做的是:

  1. 从数字主题(Long的)消费记录
  2. 聚合(计数)每个5秒窗口的值
  3. 将FINAL聚合结果发送到另一个主题

我的代码看起来像这样:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");
Run Code Online (Sandbox Code Playgroud)

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题.我的问题是如何只发送每个窗口的最终聚合结果?

apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
1万
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1