如何在窗口结束时输出窗口聚合的结果?

use*_*741 7 apache-kafka apache-kafka-streams

我有一个KStream我想要计算事件的某些方面.我按如下方式做到:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
  .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
  .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));
Run Code Online (Sandbox Code Playgroud)

我希望有一个新KStream的聚合作为事件.我可以这样轻松地做到:

ret.toStream().to("output");
Run Code Online (Sandbox Code Playgroud)

问题是"输入"主题中的每个事件都会产生一个"输出"主题的事件.我想仅在窗口完成时将事件发布到输出主题.例如,如果窗口是一分钟,则每分钟每个键发送一个事件.

我想我可以这样做:

ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有更好/更优雅的方式呢?

Ste*_*cek 9

您可以在2.1版中使用KTable KTable.suppress的新功能

此方法允许您为窗口计算获得每个窗口/键的最终结果.

更多关于suppresKIP-328

您可以suppress像这样更新您的实现:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
        .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
        .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
        .suppress(untilWindowCloses(BufferConfig.unbounded()));

ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes
Run Code Online (Sandbox Code Playgroud)

  • 如果在 TimeWindows 上没有调用 `grace(..)` 方法,则使用 `suppress(..)` 的聚合似乎根本不会传播消息 (2认同)