如何检测 KTable 连接的哪一侧触发了更新?

Edm*_*984 3 apache-kafka apache-kafka-streams

当您在 Kafka 中连接两个 KTable 时,每次更新两个 KTable 之一时,您的输出 Ktable 也会更新。

想象一下,您正在加入一个已适当减少的Customers列表。Orders再次想象一下,您使用此连接的结果来为最终客户提供特别优惠和建议:

  • 您可能想向他发送特别优惠,因为他更改了地址,并且他现在位于您销售产品 XYZ 的区域
  • 您可能想向他发送特别优惠,因为他的订单总额超过 1000 美元。

为了实现这一点,您需要知道每次连接在流上“发出”新记录时,连接的哪一侧确定了该新记录。处理此用例的适当解决方案是什么?

Mat*_*Sax 5

我认为有两种方法可以做到这一点:

  1. 在连接后使用连续的.transform()连接,将当前连接结果存储在存储中。如果您收到更新,您可以将新结果与旧结果进行比较,从而确定客户数据或订单数据是否发生更改。但这是一个内存密集型解决方案。
  2. transform()在连接之前使用 a (对于每个输入),并使用时间戳或偏移量信息来扩充记录。连接应保留此信息。因此,在结果中,与客户相比,订单的较大偏移量/时间戳告诉您订单已更新并触发了此结果。此解决方案占用的内存较少,但可能不是 100% 准确,具体取决于您的输入数据(使用偏移量可能根本不起作用,并且时间戳也可能很模糊,具体取决于数据更新的频率)。