相关疑难解决方法(0)

如何发送时间窗口KTable的最终kafka-streams聚合结果?

我想做的是:

  1. 从数字主题(Long的)消费记录
  2. 聚合(计数)每个5秒窗口的值
  3. 将FINAL聚合结果发送到另一个主题

我的代码看起来像这样:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");
Run Code Online (Sandbox Code Playgroud)

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题.我的问题是如何只发送每个窗口的最终聚合结果?

apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
1万
查看次数

Apache Kafka Streams将KTables物化为一个主题似乎很慢

我正在使用kafka流,我正在尝试将KTable实现为主题.

它工作但似乎每30秒左右完成一次.

Kafka Stream如何/何时决定将KTable的当前状态实现为主题?

有没有办法缩短这个时间并使其更"实时"?

这是我正在使用的实际代码

// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);

// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());

// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
Run Code Online (Sandbox Code Playgroud)

stream reactive-programming apache-kafka

11
推荐指数
1
解决办法
3150
查看次数