Wil*_*tte 7 apache-kafka kafka-producer-api
我正在使用 Kafka 0.11.0.0。我有一个发布到 Kafka 主题的测试程序;如果 zookeeper 和 Kafka 服务器关闭(这在我的开发环境中是正常的;我会根据需要启动它们),那么对 KafkaProducer<>.send() 的调用将无限期挂起。
我要么需要让 send() 返回,最好是指出错误;或者我需要一种方法来检查服务器是启动还是关闭。基本上,我希望我的测试工具能够告诉我,“嘿,傻瓜,启动 Kafka!” 而不是挂。
我的生产者任务有没有办法确定服务器是启动还是关闭?
我像这样调用 send() :
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
message), (rm, ex) -> {
System.out.println("**** " + rm + "\n**** " +ex);
});
Run Code Online (Sandbox Code Playgroud)
我有 linger.ms = 1; 我试过 retries=0、1 和 2,而 send() 仍然阻塞。我从未见过调用回调。
较旧的消息建议将 metadata.fetch.timeout.ms 设置为一个较小的值,但这在 0.11 中已消失。其他人建议调用命令行实用程序来查看服务器是否正常……但引用的实用程序似乎也不见了。
完成这项工作的优雅方式是什么?
我们可以通过三种方式向代理发送消息:
\n\n即发即忘: \n我们向服务器发送一条消息,并且\xe2\x80\x99 并不真正关心它是否成功到达。大多数情况下,它会成功到达,因为 Kafka 具有高可用性,并且生产者会自动重试发送消息。但是,使用此方法会丢失一些消息。
\n\n异步发送\n我们使用回调函数调用 send() 方法,该回调函数在收到来自 Kafka 代理的响应时被触发。
\n\n同步发送\n我们发送一条消息,send() 方法返回一个 Future 对象,我们使用 get() 等待 future 并查看 send() 是否成功。
\n\n最简单的同步发送消息的方式如下:
\n\nProducerRecord<String, String> record =\n new ProducerRecord<>(KAFKA_TOPIC, KEY, message);\n try {\n producer.send(record).get();\n } catch (Exception e) {\n e.printStackTrace();\n }\nRun Code Online (Sandbox Code Playgroud)\n\n在这里,我们使用Future.get()来等待 Kafka 的回复。如果记录没有成功发送到Kafka,该方法会抛出异常。如果没有错误,我们将获得一个 RecordMetadata 对象,我们可以使用它来检索消息写入的偏移量。
\n\n希望这可以帮助。
\n这很奇怪。它应该返回一条错误,指出“更新元数据失败”或“x 条记录过期”。
检查request.timeout.ms并max.block.ms设置您的制作人。默认时request.timeout.ms长为 60 秒
| 归档时间: |
|
| 查看次数: |
5652 次 |
| 最近记录: |