我可以忽略org.apache.kafka.common.errors.NotLeaderForPartitionExceptions吗?

Hec*_*tor 8 exception-handling apache-kafka kafka-producer-api

我的Apache Kafka制作人(0.9.0.1)间歇地抛出一个

org.apache.kafka.common.errors.NotLeaderForPartitionException
Run Code Online (Sandbox Code Playgroud)

我执行Kafka发送的代码类似于此

final Future<RecordMetadata> futureRecordMetadata = KAFKA_PRODUCER.send(new ProducerRecord<String, String>(kafkaTopic, UUID.randomUUID().toString(), jsonMessage));

try {
    futureRecordMetadata.get();
} catch (final InterruptedException interruptedException) {
    interruptedException.printStackTrace();
    throw new RuntimeException("sendKafkaMessage(): Failed due to InterruptedException(): " + sourceTableName + " " + interruptedException.getMessage());
} catch (final ExecutionException executionException) {
    executionException.printStackTrace();
    throw new RuntimeException("sendKafkaMessage(): Failed due to ExecutionException(): " + sourceTableName + " " + executionException.getMessage());
}
Run Code Online (Sandbox Code Playgroud)

我赶上NotLeaderForPartitionExceptioncatch (final ExecutionException executionException) {}街区.

可以忽略这个特殊的例外吗?

我的Kafka邮件是否已成功发送?

Mat*_*Sax 14

如果收到NotLeaderForPartitionException,则表示您的数据成功写入.

每个主题分区由一个或多个代理存储(一个领导者;其余代理称为跟随者),具体取决于您的复制因子.生产者需要向领导者Broker发送新消息(数据复制到追随者内部发生).

您的生产者客户端没有连接到正确的代理,即跟随者而不是领导者(或者甚至不再是跟随者的代理),并且此代理拒绝您的发送请求.如果领导者发生了变化,但生产者仍然过时了关于哪个经纪人是分区领导者的缓存元数据,就会发生这种情况.

  • @VinitGaikwad 生产者将首先在内部重试发送数据(并将刷新它的元数据以了解必须将数据发送到的新领导者)——因此,只有在所有重试都用尽时,应用程序才会收到此异常。因此,您可能希望增加生产者的 `retries` 配置参数。如果您想在应用程序级别处理它(与增加生产者重试相比,这似乎是第二好的选择),您需要调用 `Producer.send()` 来重新发送数据。 (2认同)