Hec*_*tor 8 exception-handling apache-kafka kafka-producer-api
我的Apache Kafka制作人(0.9.0.1)间歇地抛出一个
org.apache.kafka.common.errors.NotLeaderForPartitionException
Run Code Online (Sandbox Code Playgroud)
我执行Kafka发送的代码类似于此
final Future<RecordMetadata> futureRecordMetadata = KAFKA_PRODUCER.send(new ProducerRecord<String, String>(kafkaTopic, UUID.randomUUID().toString(), jsonMessage));
try {
futureRecordMetadata.get();
} catch (final InterruptedException interruptedException) {
interruptedException.printStackTrace();
throw new RuntimeException("sendKafkaMessage(): Failed due to InterruptedException(): " + sourceTableName + " " + interruptedException.getMessage());
} catch (final ExecutionException executionException) {
executionException.printStackTrace();
throw new RuntimeException("sendKafkaMessage(): Failed due to ExecutionException(): " + sourceTableName + " " + executionException.getMessage());
}
Run Code Online (Sandbox Code Playgroud)
我赶上NotLeaderForPartitionException
了catch (final ExecutionException executionException) {}
街区.
可以忽略这个特殊的例外吗?
我的Kafka邮件是否已成功发送?
Mat*_*Sax 14
如果收到NotLeaderForPartitionException
,则表示您的数据未成功写入.
每个主题分区由一个或多个代理存储(一个领导者;其余代理称为跟随者),具体取决于您的复制因子.生产者需要向领导者Broker发送新消息(数据复制到追随者内部发生).
您的生产者客户端没有连接到正确的代理,即跟随者而不是领导者(或者甚至不再是跟随者的代理),并且此代理拒绝您的发送请求.如果领导者发生了变化,但生产者仍然过时了关于哪个经纪人是分区领导者的缓存元数据,就会发生这种情况.