带有抑制的 Kafka SessionWindow 仅在有稳定的输入记录流时发送最终事件

Ant*_*era 5 apache-kafka-streams

如果没有恒定的输入记录流,则看起来好像带有宽限期和抑制的会话窗口的 Kafka Stream 无法输出最终事件。

上下文:我们正在使用变更数据捕获 (CDC) 来监控对旧数据库的更改。当用户使用 UI 进行更改时,数据库事务将更改 1..n 个表。每个 SQL 语句都会生成一条 Kafka 记录。这些需要聚合以创建一个“触发记录”,用于启动一个昂贵的过程。该过程应在提交遗留数据库中事务的一秒钟内启动。只有少数用户使用旧应用程序,因此事务之间可能会有大量时间。

我们有一个 Kafka Stream 应用程序,它使用会话窗口和 400 毫秒的不活动间隔来聚合共享相同密钥(事务 ID)的传入记录,并输出触发记录。

我们有一个可行的解决方案,但只要其他事务正在运行,触发器记录只会写入输出主题,以便生成稳定的传入记录流。我们需要关闭窗口并写入触发记录,即使没有进一步的输入记录。

工作代码在这里:https : //github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTest5Stream.java#L6

以下是该代码的摘要:

      stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream((k,v) -> k.key())
            .to("throughput-test-aggregated");
Run Code Online (Sandbox Code Playgroud)

最初我没有压制,也没有宽限期。仅使用默认配置,我总是收到包含所有聚合记录的窗口的最终事件,但在 400 毫秒窗口之后最多需要 6 秒,这对我们来说太长了,无法等待。

为了减少延迟并加快速度,我将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 1,但这会在每次聚合后导致输出记录,而不仅仅是单个输出记录。

我引入了抑制(并随之引入了 0 毫秒的宽限期),以确保仅创建一个输出记录。

现在的问题是我只收到一个输出记录,如果新的输入记录在窗口关闭后到达(不管它们的键)。

该测试创建了 10 个输入记录,所有输入记录都具有相同的键,相隔 10 毫秒,都在 100 毫秒内。然后它休息 3 秒钟,让我在一组 10 个记录后将其关闭。我希望收到一个输出记录,但没有到达,除非我让测试运行,以创建第二组输入记录。这个问题是可重现的。

我已阅读以下文章,但找不到任何描述我所看到的内容的内容,即最终记录仅在处理附加记录(无论键值如何)后才会发送到输出主题。

我必须更改什么才能将最终记录发送到我的输出主题,即使没有进一步处理记录?

(在 Linux 上将 Kafka 2.4.1 与客户端和服务器一起使用)

Tuy*_*ong 4

更新:我的拓扑有错误,已修复

在使用抑制时,我遇到了与您完全相同的问题,这是预期的行为。因为抑制仅支持使用流时间而不是挂钟时间发出缓冲记录,所以如果停止获取新记录,流时间将被冻结并且Suppress不会发出最后一个抑制窗口。

我使用的解决方案是使用 Processor API 编写自定义抑制(使用 Transfomer,以便您可以使用 DSL 将抑制的记录发送到下游),并将状态存储用作缓冲区,然后检查哪些窗口应该刷新(或发出)到下游每当有新记录进入或经过一段时间(使用WALL_CLOCK_TIME标点符号)后处理器。

变压器看起来像这样:

public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
    private ProcessorContext context;
    private Cancellable cancellable;
    private KeyValueStore<Windowed<String>, String> kvStore;
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
        cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
    }

    @Override
    public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
        kvStore.put(key, value);//buffer (or suppress) the new in coming window
        flushOldWindow();
        return null;
    }

    private void flushOldWindow() {
        //your logic to check for old windows in kvStore then flush

        //forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
    }

    @Override
    public void close() {
        cancellable.cancel();//cancel punctuate
    }
}
Run Code Online (Sandbox Code Playgroud)

在您的 Stream DSL 中:

stream.groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
            .aggregate(...)//remove suppress operator and write custom suppress using processor API
            .toStream()
            .transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
            .to("throughput-test-aggregated");
Run Code Online (Sandbox Code Playgroud)