小编Wil*_*tte的帖子

当 kafka 服务器关闭时,Kafka 生产者无限期地发送块

我正在使用 Kafka 0.11.0.0。我有一个发布到 Kafka 主题的测试程序;如果 zookeeper 和 Kafka 服务器关闭(这在我的开发环境中是正常的;我会根据需要启动它们),那么对 KafkaProducer<>.send() 的调用将无限期挂起。

我要么需要让 send() 返回,最好是指出错误;或者我需要一种方法来检查服务器是启动还是关闭。基本上,我希望我的测试工具能够告诉我,“嘿,傻瓜,启动 Kafka!” 而不是挂。

我的生产者任务有没有办法确定服务器是启动还是关闭?

我像这样调用 send() :

kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
    message), (rm, ex) -> {
        System.out.println("**** " + rm + "\n**** " +ex);
});
Run Code Online (Sandbox Code Playgroud)

我有 linger.ms = 1; 我试过 retries=0、1 和 2,而 send() 仍然阻塞。我从未见过调用回调。

较旧的消息建议将 metadata.fetch.timeout.ms 设置为一个较小的值,但这在 0.11 中已消失。其他人建议调用命令行实用程序来查看服务器是否正常……但引用的实用程序似乎也不见了。

完成这项工作的优雅方式是什么?

apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
5652
查看次数

标签 统计

apache-kafka ×1

kafka-producer-api ×1