所以实际上我已经为此苦苦挣扎了几天。我正在使用 4 个主题的记录。我需要在 TimedWindow 上聚合记录。时间到了,我想向接收器主题发送已批准的消息或未批准的消息。这可能与 kafka 流有关吗?
即使窗口仍然打开,它似乎将每条记录下沉到新主题,这真的不是我想要的。
这是简单的代码:
builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(),
Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String,
FooTriggerMessage>("", Serdes.String(),
fooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) -> value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), fooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))
.aggregate(() -> new BarApprovalMessage(), /* initializer */
(key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
Materialized
.<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
storeName) /* state store name */
.withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(),
Produced.with(windowedSerde, barApprovalSerde));
Run Code Online (Sandbox Code Playgroud)
截至目前,每条记录都被下沉到传出主题,我只希望它在窗口关闭时发送一条消息,可以这么说。
这可能吗?