关闭后如何重新连接kafka生产者?

usm*_*man 4 java multithreading scala apache-kafka

我有一个多线程应用程序,它使用生产者类来生成消息,早些时候我使用下面的代码为每个请求创建生产者。其中 KafkaProducer 是用每个请求新建的,如下所示:

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();
Run Code Online (Sandbox Code Playgroud)

然后我阅读了关于生产者的 Kafka 文档,并知道我们应该使用单个生产者实例来获得良好的性能。

然后我在单例类中创建了 KafkaProducer 的单个实例。

现在我们应该在何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它不会找到生产者重新发送消息,因此抛出:

java.lang.IllegalStateException: Cannot send after the producer is closed.
Run Code Online (Sandbox Code Playgroud)

或者我们如何在关闭后重新连接到生产者。问题是如果程序崩溃或有异常呢?

Yuv*_*kov 5

一般情况下,调用close()KafkaProducer是足够的,以确保所有的飞行记录已完成:

/**
 * Close this producer. This method blocks until all previously sent requests complete.
 * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
 * <p>
 * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
 * will be called instead. We do this because the sender thread would otherwise try to join itself and
 * block forever.</strong>
 * <p>
 *
 * @throws InterruptException If the thread is interrupted while blocked
 */
Run Code Online (Sandbox Code Playgroud)

如果你的生产者在你的应用程序的整个生命周期中都被使用,在你得到终止信号之前不要关闭它,然后调用close(). 正如文档中所说,生产者在多线程环境中使用是安全的,因此您应该重用相同的实例。

如果您KafkaProducer在多个线程中共享,您有两种选择:

  1. 呼叫close(),同时通过注册一个回调关闭Runtime.getRuntime().addShutdownHook从你的主执行线程
  2. 让您的多线程方法竞相关闭,只允许一个方法获胜。

2 的粗略草图可能如下所示:

object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false
     
  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }
    
  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)