Kafka Streams:如何确保处理完成后提交偏移量

Jay*_*iya 6 apache-kafka apache-kafka-streams

我想使用 Kafka 流处理 Kafka 主题中存在的消息。

处理的最后一步是将结果放入数据库表中。为了避免与数据库争用相关的问题(程序将 24*7 运行并处理数百万条消息),我将使用批处理进行 JDBC 调用。

但在这种情况下,有可能消息丢失(在一个场景中,我从一个主题读取了500条消息,流将标记偏移量,现在程序失败。JDBC批量更新中存在的消息丢失,但偏移量被标记为那些消息)。

我想在数据库插入/更新完成后手动标记最后一条消息的偏移量,但根据以下问题这是不可能的:如何使用 Kafka Stream 手动提交?

有人可以建议任何可能的解决方案吗

sun*_*007 3

Kafka Stream不支持手动提交,同时也不支持批处理。对于您的用例,有几种可能性:

  1. 使用Normal Consumer并实现批量处理并控制手动偏移。

  2. 使用 Spark Kafka 结构化流,如下所示 Kafka Spark 结构化流

  3. 尝试 Spring Kafka [ Spring Kafka ] 2

  4. 在这种情况下,也可以考虑 JDBC Kafka Connector。卡夫卡 JDBC 连接器