卡夫卡消费者.commitSync vs commitAsync

gst*_*low 6 java offset apache-kafka kafka-consumer-api

引自https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

缺点是,commitSync()将重试提交,直到它成功或遇到不可恢复的失败,commitAsync()将不会重试.

这句话对我来说并不清楚.我认为消费者向代理发送提交请求,如果代理在某些超时内没有响应,则意味着提交失败.我错了吗 ?

你能澄清的差异commitSync ,并commitAsync在细节?
另外,请提供我更喜欢哪种提交类型的用例.

flu*_*y03 13

正如API文档中所述:


这是一个同步提交,并将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛给调用者).

这意味着,这commitSync是一种阻止方法.调用它会阻塞你的线程,直到它成功或失败.

例如,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}
Run Code Online (Sandbox Code Playgroud)

对于for循环中的每次迭代,只有在consumer.commitSync()抛出异常成功返回或中断后,您的代码才会移动到下一次迭代.


这是一个异步调用,不会阻塞.遇到的任何错误都会传递给回调(如果提供)或被丢弃.

这意味着,这commitAsync是一种非阻塞方法.调用它不会阻止你的线程.相反,它将继续处理以下指令,无论它最终是成功还是失败.

例如,类似于前面的示例,但在这里我们使用commitAsync:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}
Run Code Online (Sandbox Code Playgroud)

对于for循环中的每次迭代,无论consumer.commitAsync()最终会发生什么,您的代码都将转移到下一次迭代.并且,提交的结果将由您定义的回调函数处理.


权衡:延迟与数据一致性

  • 如果必须确保数据一致性,请选择,commitSync()因为它将确保在执行任何进一步操作之前,您将知道偏移提交是成功还是失败.但由于它是同步和阻塞,您将花费更多时间等待提交完成,这会导致高延迟.
  • 如果您确定某些数据不一致并希望具有低延迟,请选择,commitAsync()因为它不会等待完成.相反,它将稍后发出提交请求并处理来自Kafka的响应(成功或失败),同时,您的代码将继续执行.

一般来说,实际行为将取决于您的实际代码以及您调用方法的位置.


mik*_*ike 10

使用 commitAsync() 进行稳健的重试处理

\n

在《Kafka - 权威指南》一书中,有关于如何缓解由于异步提交而导致提交较低偏移量的潜在问题的提示:

\n
\n

重试异步提交:为异步重试获得正确提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到 commitAsync 回调中。当你\xe2\x80\x99准备发送重试时,检查回调得到的提交序列号是否等于实例变量;如果是,则没有更新的提交,并且可以安全地重试。如果实例序列号较高,请不要重试,因为已发送较新的提交。

\n
\n

下面的代码描述了一个可能的解决方案:

\n
import java.util._\nimport java.time.Duration\nimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}\nimport org.apache.kafka.common.{KafkaException, TopicPartition}\nimport collection.JavaConverters._\n\nobject AsyncCommitWithCallback extends App {\n\n  // define topic\n  val topic = "myOutputTopic"\n\n  // set properties\n  val props = new Properties()\n  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")\n  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")\n  // [set more properties...]\n  \n\n  // create KafkaConsumer and subscribe\n  val consumer = new KafkaConsumer[String, String](props)\n  consumer.subscribe(List(topic).asJavaCollection)\n\n  // initialize global counter\n  val atomicLong = new AtomicLong(0)\n\n  // consume message\n  try {\n    while(true) {\n      val records = consumer.poll(Duration.ofMillis(1)).asScala\n\n      if(records.nonEmpty) {\n        for (data <- records) {\n          // do something with the records\n        }\n        consumer.commitAsync(new KeepOrderAsyncCommit)\n      }\n\n    }\n  } catch {\n    case ex: KafkaException => ex.printStackTrace()\n  } finally {\n    consumer.commitSync()\n    consumer.close()\n  }\n\n\n  class KeepOrderAsyncCommit extends OffsetCommitCallback {\n    // keeping position of this callback instance\n    val position = atomicLong.incrementAndGet()\n\n    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {\n      // retrying only if no other commit incremented the global counter\n      if(exception != null){\n        if(position == atomicLong.get) {\n          consumer.commitAsync(this)\n        }\n      }\n    }\n  }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n