小编era*_*nkl的帖子

仅在流上收到新事件时才抑制触发事件

我正在使用 Kafka 流 2.2.1。

我正在使用抑制来阻止事件直到窗口关闭。我正在使用事件时间语义。但是,只有在流上有新消息可用时才会触发触发消息。

提取以下代码以对问题进行采样:

        KStream<UUID, String>[] branches = is
            .branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
                    (key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
                    (key, value) -> true);

    KStream<UUID, String> sideA = branches[0];
    KStream<UUID, String> sideB = branches[1];

    KStream<Windowed<UUID>, String> sideASuppressed =
            sideA.groupByKey(
                    Grouped.with(new MyUUIDSerde(),
                    Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
            .reduce((v1, v2) -> {
                return v1;
            })
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream();
Run Code Online (Sandbox Code Playgroud)

当新消息到达“sideA”流时,消息仅从“sideASuppressed”流式传输(到达“sideB”的消息不会导致抑制器发出任何消息,即使窗口关闭时间已经过去很久了)。尽管在生产中,由于容量大,问题可能不会发生太多,但在很多情况下,必须不要等待进入“sideA”流的新消息。

提前致谢。

apache-kafka-streams

5
推荐指数
1
解决办法
643
查看次数

标签 统计

apache-kafka-streams ×1