usm*_*man 4 java multithreading scala apache-kafka
我有一个多线程应用程序,它使用生产者类来生成消息,早些时候我使用下面的代码为每个请求创建生产者。其中 KafkaProducer 是用每个请求新建的,如下所示:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
Run Code Online (Sandbox Code Playgroud)
然后我阅读了关于生产者的 Kafka 文档,并知道我们应该使用单个生产者实例来获得良好的性能。
然后我在单例类中创建了 KafkaProducer 的单个实例。
现在我们应该在何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它不会找到生产者重新发送消息,因此抛出:
java.lang.IllegalStateException: Cannot send after the producer is closed.
Run Code Online (Sandbox Code Playgroud)
或者我们如何在关闭后重新连接到生产者。问题是如果程序崩溃或有异常呢?
一般情况下,调用close()的KafkaProducer是足够的,以确保所有的飞行记录已完成:
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
Run Code Online (Sandbox Code Playgroud)
如果你的生产者在你的应用程序的整个生命周期中都被使用,在你得到终止信号之前不要关闭它,然后调用close(). 正如文档中所说,生产者在多线程环境中使用是安全的,因此您应该重用相同的实例。
如果您KafkaProducer在多个线程中共享,您有两种选择:
close(),同时通过注册一个回调关闭Runtime.getRuntime().addShutdownHook从你的主执行线程2 的粗略草图可能如下所示:
object KafkaOwner {
private var producer: KafkaProducer = ???
@volatile private var isClosed = false
def close(): Unit = {
if (!isClosed) {
kafkaProducer.close()
isClosed = true
}
}
def instance: KafkaProducer = {
this.synchronized {
if (!isClosed) producer
else {
producer = new KafkaProducer()
isClosed = false
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12624 次 |
| 最近记录: |