当 kafka 服务器关闭时,Kafka 生产者无限期地发送块

Wil*_*tte 7 apache-kafka kafka-producer-api

我正在使用 Kafka 0.11.0.0。我有一个发布到 Kafka 主题的测试程序;如果 zookeeper 和 Kafka 服务器关闭(这在我的开发环境中是正常的;我会根据需要启动它们),那么对 KafkaProducer<>.send() 的调用将无限期挂起。

我要么需要让 send() 返回,最好是指出错误;或者我需要一种方法来检查服务器是启动还是关闭。基本上,我希望我的测试工具能够告诉我,“嘿,傻瓜,启动 Kafka!” 而不是挂。

我的生产者任务有没有办法确定服务器是启动还是关闭?

我像这样调用 send() :

kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
    message), (rm, ex) -> {
        System.out.println("**** " + rm + "\n**** " +ex);
});
Run Code Online (Sandbox Code Playgroud)

我有 linger.ms = 1; 我试过 retries=0、1 和 2,而 send() 仍然阻塞。我从未见过调用回调。

较旧的消息建议将 metadata.fetch.timeout.ms 设置为一个较小的值,但这在 0.11 中已消失。其他人建议调用命令行实用程序来查看服务器是否正常……但引用的实用程序似乎也不见了。

完成这项工作的优雅方式是什么?

sha*_*eel 6

我们可以通过三种方式向代理发送消息:

\n\n

即发即忘: \n我们向服务器发送一条消息,并且\xe2\x80\x99 并不真正关心它是否成功到达。大多数情况下,它会成功到达,因为 Kafka 具有高可用性,并且生产者会自动重试发送消息。但是,使用此方法会丢失一些消息。

\n\n

异步发送\n我们使用回调函数调用 send() 方法,该回调函数在收到来自 Kafka 代理的响应时被触发。

\n\n

同步发送\n我们发送一条消息,send() 方法返回一个 Future 对象,我们使用 get() 等待 future 并查看 send() 是否成功。

\n\n

最简单的同步发送消息的方式如下:

\n\n
ProducerRecord<String, String> record =\n            new ProducerRecord<>(KAFKA_TOPIC, KEY, message);\n    try {\n            producer.send(record).get();\n    } catch (Exception e) {\n             e.printStackTrace();\n    }\n
Run Code Online (Sandbox Code Playgroud)\n\n

在这里,我们使用Future.get()来等待 Kafka 的回复。如果记录没有成功发送到Kafka,该方法会抛出异常。如果没有错误,我们将获得一个 RecordMetadata 对象,我们可以使用它来检索消息写入的偏移量。

\n\n

希望这可以帮助。

\n


Sha*_*s88 5

这很奇怪。它应该返回一条错误,指出“更新元数据失败”或“x 条记录过期”。

检查request.timeout.msmax.block.ms设置您的制作人。默认时request.timeout.ms长为 60 秒