Kafka异步提交偏移复制

ste*_*rnr 9 apache-kafka

我们偶尔会遇到副本负责人和其他ISR节点之间的高延迟,导致消费者收到以下错误:

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Commit offsets failed with retriable exception. You should retry committing offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.
Run Code Online (Sandbox Code Playgroud)

我可以增加,offsets.commit.timeout.ms但我不想,因为它可能会导致额外的副作用.但是从更广泛的角度来看,我不希望代理等待所有其他副本上的提交偏移同步,而是在本地提交并异步更新其余副本.通过代理配置,我找到了offsets.commit.required.acks哪个看起来完全配置,但该文档也隐含地说:the default (-1) should not be overridden.

为什么?我甚至尝试查看代理源代码,但发现其他信息很少.

知道为什么不推荐这个吗?是否有不同的方法来实现相同的结果?

mik*_*ike 0

我建议实际重试提交偏移量。

\n

让您的消费者异步提交偏移量并实现重试机制。但是,重试异步提交可能会导致您在提交较大偏移量后提交较小偏移量,这种情况应该完全避免。

\n

在《Kafka - The Definitive Guide》一书中,有关于如何缓解这个问题的提示:

\n
\n

重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到 commitAsync 回调中。当你\xe2\x80\x99准备发送重试时,检查回调得到的提交序列号是否等于实例变量;如果是,则没有更新的提交,并且可以安全地重试。如果实例序列号较高,请不要重试,因为已发送较新的提交。

\n
\n

作为示例,您可以在下面的 Scala 中看到这个想法的实现:

\n
import java.util._\nimport java.time.Duration\nimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}\nimport org.apache.kafka.common.{KafkaException, TopicPartition}\nimport collection.JavaConverters._\n\nobject AsyncCommitWithCallback extends App {\n\n  // define topic\n  val topic = "myOutputTopic"\n\n  // set properties\n  val props = new Properties()\n  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")\n  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")\n  // [set more properties...]\n  \n\n  // create KafkaConsumer and subscribe\n  val consumer = new KafkaConsumer[String, String](props)\n  consumer.subscribe(List(topic).asJavaCollection)\n\n  // initialize global counter\n  val atomicLong = new AtomicLong(0)\n\n  // consume message\n  try {\n    while(true) {\n      val records = consumer.poll(Duration.ofMillis(1)).asScala\n\n      if(records.nonEmpty) {\n        for (data <- records) {\n          // do something with the records\n        }\n        consumer.commitAsync(new KeepOrderAsyncCommit)\n      }\n\n    }\n  } catch {\n    case ex: KafkaException => ex.printStackTrace()\n  } finally {\n    consumer.commitSync()\n    consumer.close()\n  }\n\n\n  class KeepOrderAsyncCommit extends OffsetCommitCallback {\n    // keeping position of this callback instance\n    val position = atomicLong.incrementAndGet()\n\n    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {\n      // retrying only if no other commit incremented the global counter\n      if(exception != null){\n        if(position == atomicLong.get) {\n          consumer.commitAsync(this)\n        }\n      }\n    }\n  }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n