KafkaProducer.close() 和 KafkaProducer.flush() 之间的区别

Jav*_*era 6 apache-kafka kafka-producer-api

查看文档,我不确定我是否理解使用close()lush()之间的区别。

这是flush()的文档

 * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
 * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
 * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
 * A request is considered completed when it is successfully acknowledged
 * according to the <code>acks</code> configuration you have specified or else it results in an error.
Run Code Online (Sandbox Code Playgroud)

以及 close() 的文档:

 * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
 * If the producer is unable to complete all requests before the timeout expires, this method will fail
 * any unsent and unacknowledged records immediately.
Run Code Online (Sandbox Code Playgroud)

这是否意味着:

  1. 如果我使用 close() 并且内存缓冲区中有一些待处理的记录,它们甚至不会被尝试(与刷新相比,刷新会尝试发送它们)?
  2. 如果我使用flush(),如果重试次数很大,它可能会“永远”阻塞?当使用 close() 时,我有一个需要多长时间的上限?

我想如果我在 1. 中是正确的,那么 acks=0 的生产者将获得一条记录的确认,如果该记录“不幸”被放置在内存队列中,则该记录甚至可能不会被尝试发布,并且调用 close() 后立即执行。

var*_*shy 3

如果你想继续使用生产者并等待消息发送,你可以使用flush else close。关闭超时值将等待消息发送并根据配置接收确认,直到超时值,然后关闭生产者