Edm*_*984 3 apache-kafka apache-kafka-streams
当您在 Kafka 中连接两个 KTable 时,每次更新两个 KTable 之一时,您的输出 Ktable 也会更新。
想象一下,您正在加入一个已适当减少的Customers列表。Orders再次想象一下,您使用此连接的结果来为最终客户提供特别优惠和建议:
为了实现这一点,您需要知道每次连接在流上“发出”新记录时,连接的哪一侧确定了该新记录。处理此用例的适当解决方案是什么?
我认为有两种方法可以做到这一点:
.transform()连接,将当前连接结果存储在存储中。如果您收到更新,您可以将新结果与旧结果进行比较,从而确定客户数据或订单数据是否发生更改。但这是一个内存密集型解决方案。transform()在连接之前使用 a (对于每个输入),并使用时间戳或偏移量信息来扩充记录。连接应保留此信息。因此,在结果中,与客户相比,订单的较大偏移量/时间戳告诉您订单已更新并触发了此结果。此解决方案占用的内存较少,但可能不是 100% 准确,具体取决于您的输入数据(使用偏移量可能根本不起作用,并且时间戳也可能很模糊,具体取决于数据更新的频率)。