Ant*_*era 5 apache-kafka-streams
如果没有恒定的输入记录流,则看起来好像带有宽限期和抑制的会话窗口的 Kafka Stream 无法输出最终事件。
上下文:我们正在使用变更数据捕获 (CDC) 来监控对旧数据库的更改。当用户使用 UI 进行更改时,数据库事务将更改 1..n 个表。每个 SQL 语句都会生成一条 Kafka 记录。这些需要聚合以创建一个“触发记录”,用于启动一个昂贵的过程。该过程应在提交遗留数据库中事务的一秒钟内启动。只有少数用户使用旧应用程序,因此事务之间可能会有大量时间。
我们有一个 Kafka Stream 应用程序,它使用会话窗口和 400 毫秒的不活动间隔来聚合共享相同密钥(事务 ID)的传入记录,并输出触发记录。
我们有一个可行的解决方案,但只要其他事务正在运行,触发器记录只会写入输出主题,以便生成稳定的传入记录流。我们需要关闭窗口并写入触发记录,即使没有进一步的输入记录。
工作代码在这里:https : //github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTest5Stream.java#L6
以下是该代码的摘要:
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("throughput-test-aggregated");
Run Code Online (Sandbox Code Playgroud)
最初我没有压制,也没有宽限期。仅使用默认配置,我总是收到包含所有聚合记录的窗口的最终事件,但在 400 毫秒窗口之后最多需要 6 秒,这对我们来说太长了,无法等待。
为了减少延迟并加快速度,我将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 1,但这会在每次聚合后导致输出记录,而不仅仅是单个输出记录。
我引入了抑制(并随之引入了 0 毫秒的宽限期),以确保仅创建一个输出记录。
现在的问题是我只收到一个输出记录,如果新的输入记录在窗口关闭后到达(不管它们的键)。
该测试创建了 10 个输入记录,所有输入记录都具有相同的键,相隔 10 毫秒,都在 100 毫秒内。然后它休息 3 秒钟,让我在一组 10 个记录后将其关闭。我希望收到一个输出记录,但没有到达,除非我让测试运行,以创建第二组输入记录。这个问题是可重现的。
我已阅读以下文章,但找不到任何描述我所看到的内容的内容,即最终记录仅在处理附加记录(无论键值如何)后才会发送到输出主题。
我必须更改什么才能将最终记录发送到我的输出主题,即使没有进一步处理记录?
(在 Linux 上将 Kafka 2.4.1 与客户端和服务器一起使用)
更新:我的拓扑有错误,已修复
在使用抑制时,我遇到了与您完全相同的问题,这是预期的行为。因为抑制仅支持使用流时间而不是挂钟时间发出缓冲记录,所以如果停止获取新记录,流时间将被冻结并且Suppress不会发出最后一个抑制窗口。
我使用的解决方案是使用 Processor API 编写自定义抑制(使用 Transfomer,以便您可以使用 DSL 将抑制的记录发送到下游),并将状态存储用作缓冲区,然后检查哪些窗口应该刷新(或发出)到下游每当有新记录进入或经过一段时间(使用WALL_CLOCK_TIME标点符号)后处理器。
变压器看起来像这样:
public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
private ProcessorContext context;
private Cancellable cancellable;
private KeyValueStore<Windowed<String>, String> kvStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
}
@Override
public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
kvStore.put(key, value);//buffer (or suppress) the new in coming window
flushOldWindow();
return null;
}
private void flushOldWindow() {
//your logic to check for old windows in kvStore then flush
//forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
}
@Override
public void close() {
cancellable.cancel();//cancel punctuate
}
}
Run Code Online (Sandbox Code Playgroud)
在您的 Stream DSL 中:
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
.aggregate(...)//remove suppress operator and write custom suppress using processor API
.toStream()
.transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
.to("throughput-test-aggregated");
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
766 次 |
| 最近记录: |