KafkaIO检查点 - 如何提交补偿卡夫卡

r3s*_*e55 5 scala apache-kafka google-cloud-dataflow apache-beam

我正在使用Google Dataflow中的Beam KafkaIO源运行作业,找不到在作业重新启动期间持久保存偏移的简单方法(作业更新选项不够,我需要重新启动作业)

将Beam的KafkaIO与PubSubIO进行比较(或者更精确地说是比较PubsubCheckpoint与KafkaCheckpointMark),我可以看到,在KafkaIO中并未实现检查点持久性(KafkaCheckpointMark.finalizeCheckpoint方法为空),而在PubsubCheckpoint.finalizeCheckpoint中实现了它,它对PubSub进行了确认。

这是否意味着我无法以最小的努力可靠地管理作业重启时的Kafka偏移量?

我到目前为止考虑的选项:

  1. 实现我自己的持久偏移逻辑-听起来很复杂,我在Scala中使用Beam尽管Scio。

  2. 不执行任何操作,但这将导致作业重新启动时出现许多重复(主题的保留期为30天)。

  3. 启用自动提交,但这会导致丢失消息,甚至更糟。

Rag*_*adi 5

有两个选项:在KafkaIO中启用commitOffsetsInFinalize()或在Kafka使用者配置中启用自动提交。请注意,虽然commitOffsetsInFinalize()与Kafka的自动提交相比,它与Beam中已处理的内容更加同步,但它并不能完全保证一次处理。想象一下有两个阶段的流水线,Dataflow在第一阶段之后完成Kafka阅读器的定稿,而无需等待第二阶段完成。如果您当时从头开始重新启动管道,则不会处理第一阶段完成但第二阶段尚未处理的记录。对于PubsubIO,该问题没有不同。

Regd选项(2):您可以将KafkaIO配置为从特定的时间戳开始读取(假设Kafka服务器支持该时间戳(版本10+))。但是看起来并没有启用auto_commit更好。

也就是说,KafkaIO应该支持最终确定。比启用auto_commit可能更容易使用(需要考虑频率等)。我们没有很多用户要求它。如果可以,请在user@beam.apache.org上提及。

[更新:我添加了对PR 4481中的 KafkaCheckpointMark提交偏移量的支持]