我正在使用 Kafka 流 2.2.1。
我正在使用抑制来阻止事件直到窗口关闭。我正在使用事件时间语义。但是,只有在流上有新消息可用时才会触发触发消息。
提取以下代码以对问题进行采样:
KStream<UUID, String>[] branches = is
.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
(key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
(key, value) -> true);
KStream<UUID, String> sideA = branches[0];
KStream<UUID, String> sideB = branches[1];
KStream<Windowed<UUID>, String> sideASuppressed =
sideA.groupByKey(
Grouped.with(new MyUUIDSerde(),
Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
.reduce((v1, v2) -> {
return v1;
})
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
Run Code Online (Sandbox Code Playgroud)
当新消息到达“sideA”流时,消息仅从“sideASuppressed”流式传输(到达“sideB”的消息不会导致抑制器发出任何消息,即使窗口关闭时间已经过去很久了)。尽管在生产中,由于容量大,问题可能不会发生太多,但在很多情况下,必须不要等待进入“sideA”流的新消息。
提前致谢。