edd*_*P23 3 apache-kafka apache-kafka-streams
我的Kafka Streams聚合读取了一个紧凑的主题并执行此操作:
(0_10, ..), (0_11, ..) ---> (0, [10]) (0, [10, 11])
我想知道如何控制聚合时间窗口,因此它不会为每个传入的消息发送消息,而是等待并聚合其中的一些消息.Imagine Stream App使用以下消息:
(0_10, ..)(1_11, ..)(0_13, ..)如果以前的3条消息在短时间内到达,我希望看到:
(0,[10])(0, [10, 13])(1, [11])我无法弄清楚,在吐出新值之前,如何告诉我的Kafka Stream应用程序等待更多聚合需要多长时间.
我的代码非常简单
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
Run Code Online (Sandbox Code Playgroud)
目前,它有时会批量聚合,但不确定如何调整它:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
Run Code Online (Sandbox Code Playgroud)
我想知道如何控制聚合时间窗口,因此它不会为每个传入的消息发送消息,而是等待并聚合其中的一些消息.
这与Kafka Streams的窗口无法实现.一般来说,Kafka Streams窗口不会"关闭"或"结束",因为一旦窗口"关闭",你无法告诉它产生最终结果(没有这样的概念).这是为了适应迟到的结果.当消息到达聚合窗口时,您将看到更新.Kafka Streams吐出更新的频率取决于缓存(见下文).有关更多信息,请参阅:如何发送时间窗为KTable的最终kafka-streams聚合结果?
目前,它有时会批量聚合,但不确定如何调整它:
您在那里看到的最有可能是在商店中缓存的结果KTables.KTables仅在更改日志刷新并提交其偏移量时转发下游消息.这是为了在需要恢复状态时保持一致性.如果更改Kafka Streams的应用程序的提交间隔,则缓存刷新的频率会降低,因此您将看到从KTables 转发的更新更少(更改日志,聚合等).但那与窗口无关.
尽管如此,如果您想要对更改日志流进行窗口化聚合,您可以将其转换KTable为KStream使用KTable#toStream().然后,您可以在聚合步骤中指定窗口.
| 归档时间: |
|
| 查看次数: |
1418 次 |
| 最近记录: |