如何发送时间窗口KTable的最终kafka-streams聚合结果?

oda*_*vid 31 apache-kafka apache-kafka-streams

我想做的是:

  1. 从数字主题(Long的)消费记录
  2. 聚合(计数)每个5秒窗口的值
  3. 将FINAL聚合结果发送到另一个主题

我的代码看起来像这样:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");
Run Code Online (Sandbox Code Playgroud)

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题.我的问题是如何只发送每个窗口的最终聚合结果?

Mat*_*Sax 25

在Kafka Streams中,没有"最终聚合"这样的东西.Windows始终处于打开状态以处理迟到的记录(当然窗口不会永久保存,它们会被丢弃直到它们的保留时间到期 - 但是,当窗口被丢弃时没有特殊操作).

有关更多详细信息,请参阅Confluent文档:http://docs.confluent.io/current/streams/

因此,对于聚合的每次更新,都会生成结果记录(因为Kafka Streams还会更新迟到记录的聚合结果).您的"最终结果"将是最新的结果记录(在窗口被丢弃之前).根据您的使用情况,手动重复数据删除将是解决问题的一种方法(使用低杠杆API,transform()process())

这篇博客文章也许有帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

另一篇不使用标点符号解决此问题的博客文章:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

更新

使用KIP-328,KTable#suppress()添加了一个运算符,允许以严格的方式抑制连续更新,并在每个窗口发出一个结果记录; 权衡是增加延迟.

  • 对于我们的凡人Dimitry你有这个实现吗?:) (3认同)

Ami*_*aki 5

从 Kafka Streams 2.1 版开始,您可以使用 suppress.

提到的 apache Kafka Streams 文档中有一个示例,当用户在一小时内发生的事件少于三个时,它会发送警报:

KGroupedStream<UserId, Event> grouped = ...;
grouped
  .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
  .count()
  .suppress(Suppressed.untilWindowCloses(unbounded()))
  .filter((windowedUserId, count) -> count < 3)
  .toStream()
  .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
Run Code Online (Sandbox Code Playgroud)

如此答案的更新中所述,您应该了解权衡。此外,请注意suppress() 基于事件时间。