KStream-KTable join 写入 KTable:如何将 join 与 ktable 写入同步?

Cry*_*ark 5 apache-kafka-streams

我对以下拓扑的行为方式有一些问题:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
Run Code Online (Sandbox Code Playgroud)

当我同时收到不同的事件时,就会发生我的问题。由于我的状态突变是由 完成leftJoin然后由to方法编写的。如果使用相同的键同时收到事件 1 和 2,我可能会发生以下情况:

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
Run Code Online (Sandbox Code Playgroud)

因此,状态 Y 没有来自 的更改event1,因此我丢失了数据。

这是我所看到的日志(该Processing:...部分是从值连接器内部记录的):

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Run Code Online (Sandbox Code Playgroud)

Event1可以认为是创建事件:它将在KTable中创建条目,因此状态是否为空都没有关系。Event2虽然需要将其更改应用于现有状态,但没有找到任何更改,因为第一个状态更改仍未写入 KTable(它仍未被该to方法处理)

有没有办法确保我的 leftJoin 和我对 ktable 的写入是原子完成的?

谢谢

更新和当前解决方案

感谢@Matthias 的回应,我能够使用Transformer.

代码如下所示:

那是变压器

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
Run Code Online (Sandbox Code Playgroud)

这是调整后的拓扑:

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Run Code Online (Sandbox Code Playgroud)

当我们使用 KTable 的 KV StateStore 并通过put方法事件直接在其中应用更改时,应该始终获取更新的状态。我仍然想知道的一件事是:如果我有持续的高事件吞吐量怎么办。

我们对 KTable 的 KV 存储所做的 put 与在 KTable 的主题中完成的写入之间是否仍然存在竞争条件?

Mat*_*Sax 6

AKTable被分片到多个物理存储中,每个存储仅由单个线程更新。因此,您描述的场景不会发生。如果您有 2 条具有相同时间戳的记录,它们都更新同一个分片,则它们将一个接一个地处理(按偏移顺序)。因此,第二次更新将看到第一次更新后的状态。

所以也许你只是没有正确描述你的场景?

更新

进行连接时不能改变状态。因此,期望

event1 joins with state A => state A mutated to state X
Run Code Online (Sandbox Code Playgroud)

是错的。与任何处理顺序无关,当event1与 连接时state A,将以state A只读模式访问,state A不会被修改。

因此,当event2加入时,它将看到与 相同的状态event1。对于流表连接,表状态仅在从表输入主题读取新数据时更新。

如果您希望从两个输入更新共享状态,则需要使用transform()以下方法构建自定义解决方案:

event1 joins with state A => state A mutated to state X
Run Code Online (Sandbox Code Playgroud)

这将创建一个由两个处理器共享的存储,并且两者都可以根据需要进行读/写。因此,对于表输入,您可以只更新状态而不向下游发送任何内容,而对于流输入,您可以进行连接、更新状态并向下游发送结果。

更新 2

对于解决方案,Transformer应用到状态的更新和记录Transformer状态更新后的过程之间将不存在竞争条件。这部分将在单个线程中执行,并且记录将按照输入主题的偏移顺序进行处理。因此,可以确保状态更新可用于以后的记录。