Kafka KStream-KTable 加入竞争条件

Dav*_*ton 6 apache-kafka apache-kafka-streams

我有以下几点:

KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");  
Run Code Online (Sandbox Code Playgroud)

流 B 中的消息需要使用表 A 中的数据进行丰富。

示例数据:

Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
Run Code Online (Sandbox Code Playgroud)

在一个完美的世界里,我想做

streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
       .selectKey((k,b) -> b.name)
       .to("C");
Run Code Online (Sandbox Code Playgroud)

不幸的是,这对我不起作用,因为我的数据是这样的,每次将消息写入主题 A 时,也会将相应的消息写入主题 B(源是单个 DB 事务)。现在,在此初始“创建”事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上显示多个事件,但对于给定的键,也可能有几个小时的连续事件。

简单的解决方案不起作用的原因是原始的“创建”事务会导致竞争条件:主题 A 和 B 几乎同时获取他们的消息,如果 B 消息首先到达拓扑的“加入”部分(比如几毫秒)在 A 消息到达那里之前)tableA 还不会包含相应的条目。此时事件丢失。我可以在主题 C 上看到这种情况:有些事件出现,有些没有(如果我使用 leftJoin,所有事件都会出现,但有些具有空键,相当于丢失)。这只是初始“创建”事务的问题。之后每次事件到达主题 B 时,表 A 中都存在相应的条目。

所以我的问题是:你如何解决这个问题?

我目前的解决方案是丑陋的。我所做的是创建了一个“B 集合”并使用以下内容阅读主题 B

B.groupByKey()
 .aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
 .join(tableA, ...);
Run Code Online (Sandbox Code Playgroud)

现在我们有一个 KTable-KTable 连接,它不受这种竞争条件的影响。我认为这是“丑陋”的原因是因为在每次加入后,我必须向主题 B 发送一条特殊消息,该消息基本上说“从集合中删除我刚刚处理的事件”。如果此特殊消息未发送到主题 B,则集合将继续增长,并且每次加入时都会报告集合中的每个事件。

目前我正在调查窗口连接是否有效(将 A 和 B 读入 KStreams 并使用窗口连接)。我不确定这是否会起作用,因为窗口大小没有上限。我想说,“窗口从 'before' 开始 1 秒并在 'after' 无限秒结束”。即使我能以某种方式完成这项工作,我还是有点担心拥有无界窗口的空间要求。

任何建议将不胜感激。

Mat*_*Sax 4

不确定您使用的是哪个版本,但最新的 Kafka 2.1 改进了流表连接。即使在 2.1 之前,以下内容也成立:

  • 流表连接基于事件时间
  • Kafka Streams 基于事件时间处理消息,但是按照偏移顺序(对于两个输入流,首先处理记录时间戳较小的流)
  • 如果要保证表先更新,表更新记录的时间戳应该比流记录小

从 2.1 开始:

  • 为了允许一些延迟,您可以配置max.task.idle.ms配置以在只有一个输入主题有输入数据的情况下延迟处理

事件时间处理顺序在 2.0 及更早版本中以尽力而为的方式实现,这可能会导致您描述的竞争条件。在 2.1 中,处理顺序是有保证的,并且只有在max.task.idle.ms命中时才可能被违反。

详细信息请参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

  • 您可以通过“StreamsBuilder#table(..., Consumed.with(TimestampExtractor))”“移动”时间戳,为表输入主题提供自定义“TimestampExtractor”。 (2认同)