era*_*nkl 5 apache-kafka-streams
我正在使用 Kafka 流 2.2.1。
我正在使用抑制来阻止事件直到窗口关闭。我正在使用事件时间语义。但是,只有在流上有新消息可用时才会触发触发消息。
提取以下代码以对问题进行采样:
KStream<UUID, String>[] branches = is
.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
(key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
(key, value) -> true);
KStream<UUID, String> sideA = branches[0];
KStream<UUID, String> sideB = branches[1];
KStream<Windowed<UUID>, String> sideASuppressed =
sideA.groupByKey(
Grouped.with(new MyUUIDSerde(),
Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
.reduce((v1, v2) -> {
return v1;
})
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
Run Code Online (Sandbox Code Playgroud)
当新消息到达“sideA”流时,消息仅从“sideASuppressed”流式传输(到达“sideB”的消息不会导致抑制器发出任何消息,即使窗口关闭时间已经过去很久了)。尽管在生产中,由于容量大,问题可能不会发生太多,但在很多情况下,必须不要等待进入“sideA”流的新消息。
提前致谢。
根据 Kafka 流文档:
仅当所有输入主题上的所有输入分区都有可用的新数据(具有更新的时间戳)时,流时间才会提前。如果至少一个分区没有任何可用的新数据,则流时间将不会提前,因此如果指定了 PunctuationType.STREAM_TIME,则不会触发 punctuate()。此行为与配置的时间戳提取器无关,即,使用 WallclockTimestampExtractor 不会启用 punctuate() 的挂钟触发。
我不确定为什么会出现这种情况,但是,它解释了为什么只有当消息在它使用的队列中可用时才会发出抑制的消息。
如果有人对为什么要这样实施有答案,我将很乐意学习。这种行为导致我的实现发出消息只是为了让我的抑制消息及时发出,并导致代码的可读性大大降低。
| 归档时间: |
|
| 查看次数: |
643 次 |
| 最近记录: |