在spring-kafka中处理消费者错误后提交偏移量

maw*_*wek 3 apache-kafka kafka-consumer-api spring-kafka

我不完全理解消费者错误处理如何与提交偏移量和 akcMode 一起工作,以及它如何受到错误停止容器的影响(使用spring-kafka 1.3.*)。

假设我有两个消费者(消耗两个分区),他们在轮询 ( max.records.per.poll=5)时都从他们的分区中获取 5 个事件。

第一个消费者 - 第一个事件处理正常,处理第二个事件失败 - 所以在我调用的错误处理程序中kafkaListenerEndpointRegistry.stop(),但由于实现了停止,它只是停止消费者轮询,两个消费者仍然完成处理他们当前的批次。因此,第一个消费者处理事件 3、4、5(所有这些都处理没有错误),假设第二个消费者在第 4 个事件上失败(事件 1、2、3、5 处理正常)。我的问题是将为每个消费者提交哪些偏移量?

我的理解是:

  • 当我AckMode.RECORD/BATCH结合使用时ackOnError- 将为两个消费者提交最新的偏移量(5)
  • 当我AckMode.RECORD/BATCH与 with 结合使用时!ackOnError- 也会为两个消费者提交最新的偏移量 - 因为尽管在处理批处理期间某些事件失败,但批次中最新处理的事件还可以,因此最新处理的事件偏移量获胜。

我的理解正确吗?

Gar*_*ell 5

你的理解是正确的;当您停止容器时,您还应该向侦听器发出信号,表明它也需要拒绝任何剩余的记录,这样它们的偏移量就不会被提交。

我们正在考虑添加一个stopNow()方法,该方法将阻止向侦听器发送额外的记录。

在 2.0 中,我们添加了RemainingRecordsErrorHandler(和一个实现,SeekToCurrentErrorHandler)。当容器检测到这样的错误处理程序时,它会将剩余的记录呈现给错误处理程序而不是侦听器。

SeekToCurrentErrorHandler追求大家的话题/分区未处理的偏移量(包括失败的记录),所以他们都在接下来的调查中检索。

自定义实现可能会寻找剩余的记录,但将失败的记录发送到死信主题(或以其他方式处理它)。

也就是说,stopNow()对于大多数人来说可能更容易处理,但它可能只是 2.2 功能;1.3.x 用户需要在失败后丢弃/拒绝未处理的记录。

您还可以使用 a RetryingMessageListenerAdapter(或启用重试,如果使用@KafkaListener),它将根据其重试配置重试交付,根本不涉及 Kafka。失败的记录可以RecoveryCallback在重试用完后通过 a 处理,然后提交其偏移量;在这种情况下不需要停止容器。