小编Swi*_*ife的帖子

如何在 kafka 流中关闭窗口时发送主题记录

所以实际上我已经为此苦苦挣扎了几天。我正在使用 4 个主题的记录。我需要在 TimedWindow 上聚合记录。时间到了,我想向接收器主题发送已批准的消息或未批准的消息。这可能与 kafka 流有关吗?

即使窗口仍然打开,它似乎将每条记录下沉到新主题,这真的不是我想要的。

这是简单的代码:

 builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(), 
 Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String, 
 FooTriggerMessage>("", Serdes.String(),
       fooTriggerSerde))
 .filter((key, value) -> value.getTriggerEventId() != null)
 .groupBy((key, value) -> value.getTriggerEventId().toString(),
       Serialized.with(Serdes.String(), fooTriggerSerde))

.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))

.aggregate(() -> new BarApprovalMessage(), /* initializer */
       (key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
       Materialized
               .<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
                       storeName) /* state store name */
               .withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(), 
Produced.with(windowedSerde, barApprovalSerde));
Run Code Online (Sandbox Code Playgroud)

截至目前,每条记录都被下沉到传出主题,我只希望它在窗口关闭时发送一条消息,可以这么说。

这可能吗?

windowed apache-kafka-streams

3
推荐指数
2
解决办法
1310
查看次数

标签 统计

apache-kafka-streams ×1

windowed ×1