如果我不关闭 kafka 生产者会发生什么

Ale*_*sta 7 java apache-kafka kafka-producer-api

我正在处理 xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了 kafka 生产者,这里的问题是 kafka 生产者的发送方法是异步的,因此,有时当我关闭生产者时它trowsjava.lang.IllegalStateException: Cannot send after the producer is closed.我读的地方,我可以离开了制片人开放。我的问题是:这意味着什么,或者是否有更好的解决方案。

- -编辑 - -

<list>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
  <element attr1="" att2="" attr3=""/>
...
</list>
Run Code Online (Sandbox Code Playgroud)

想象以下场景:

  • 我们读取标签并创建 kafka 生产者
  • 我们读取每个元素的属性,生成一个 json 对象并使用 send 方法将其发送到 kafka。- 当我们读取元素时,我们在生产者中调用 close 方法

问题是元素的数量可能是 80k,因此,有时当我们调用 disconnect 方法时,它会继续以异步方式发送消息。所以我们需要先调用flush方法,但是会影响性能

Mat*_*Sax 7

你应该在打电话Producer.flush()之前先打电话Producer.close()。这是一个阻塞调用,不会在所有记录发送之前返回。

如果您不调用close(),根据实现/语言,您可能最终会出现资源/内存泄漏。