Dav*_*ton 6 apache-kafka apache-kafka-streams
我有以下几点:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
Run Code Online (Sandbox Code Playgroud)
流 B 中的消息需要使用表 A 中的数据进行丰富。
示例数据:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
Run Code Online (Sandbox Code Playgroud)
在一个完美的世界里,我想做
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
Run Code Online (Sandbox Code Playgroud)
不幸的是,这对我不起作用,因为我的数据是这样的,每次将消息写入主题 A 时,也会将相应的消息写入主题 B(源是单个 DB 事务)。现在,在此初始“创建”事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上显示多个事件,但对于给定的键,也可能有几个小时的连续事件。
简单的解决方案不起作用的原因是原始的“创建”事务会导致竞争条件:主题 A 和 B 几乎同时获取他们的消息,如果 B 消息首先到达拓扑的“加入”部分(比如几毫秒)在 A 消息到达那里之前)tableA 还不会包含相应的条目。此时事件丢失。我可以在主题 C 上看到这种情况:有些事件出现,有些没有(如果我使用 leftJoin,所有事件都会出现,但有些具有空键,相当于丢失)。这只是初始“创建”事务的问题。之后每次事件到达主题 B 时,表 A 中都存在相应的条目。
所以我的问题是:你如何解决这个问题?
我目前的解决方案是丑陋的。我所做的是创建了一个“B 集合”并使用以下内容阅读主题 B
B.groupByKey()
.aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
.join(tableA, ...);
Run Code Online (Sandbox Code Playgroud)
现在我们有一个 KTable-KTable 连接,它不受这种竞争条件的影响。我认为这是“丑陋”的原因是因为在每次加入后,我必须向主题 B 发送一条特殊消息,该消息基本上说“从集合中删除我刚刚处理的事件”。如果此特殊消息未发送到主题 B,则集合将继续增长,并且每次加入时都会报告集合中的每个事件。
目前我正在调查窗口连接是否有效(将 A 和 B 读入 KStreams 并使用窗口连接)。我不确定这是否会起作用,因为窗口大小没有上限。我想说,“窗口从 'before' 开始 1 秒并在 'after' 无限秒结束”。即使我能以某种方式完成这项工作,我还是有点担心拥有无界窗口的空间要求。
任何建议将不胜感激。
不确定您使用的是哪个版本,但最新的 Kafka 2.1 改进了流表连接。即使在 2.1 之前,以下内容也成立:
从 2.1 开始:
max.task.idle.ms配置以在只有一个输入主题有输入数据的情况下延迟处理事件时间处理顺序在 2.0 及更早版本中以尽力而为的方式实现,这可能会导致您描述的竞争条件。在 2.1 中,处理顺序是有保证的,并且只有在max.task.idle.ms命中时才可能被违反。
| 归档时间: |
|
| 查看次数: |
1499 次 |
| 最近记录: |