我有一个像这样的卡夫卡制片人
public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {
this.kafkaTopic = map;
final Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
producer = new KafkaProducer<String, String>(properties);
}
Run Code Online (Sandbox Code Playgroud)
我正在使用以下代码发送消息。(也尝试使用回调)。
public void sendMessage(String topic, RestCommonResource resultToken) {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.valueToTree(resultToken);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());
producer.send(record);
}
Run Code Online (Sandbox Code Playgroud)
但是,如果kafka服务器已关闭,并且生产者发布了一条消息,则程序将陷入无限循环,但出现以下异常:
WARN [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1
! java.net.ConnectException: Connection refused: no further information
! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
Run Code Online (Sandbox Code Playgroud)
如果可以设置任何属性来停止此重试并删除该消息。
目前,如果 Kafka 客户端失去与代理的连接,它将等待reconnect.backoff.ms毫秒,然后再尝试重新连接。
虽然这种策略在客户端短时间断开连接时效果很好,但如果单个代理或整个集群长时间不可用,则所有客户端都会快速生成大量连接。
此外,开发人员对不断失去与集群连接的客户端的控制有限。
我认为这个主题对您有用:Add custompolicy for reconnect attempts to NetworkdClient
reconnect.backoff.ms :尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧密循环中重复连接到主机。此退避适用于客户端与代理的所有连接尝试。
reconnect.backoff.max.ms:重新连接到多次连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,每个主机的退避将在每次连续连接失败时呈指数增加,直至达到此最大值。计算退避增量后,添加 20% 的随机抖动以避免连接风暴。
| 归档时间: |
|
| 查看次数: |
2020 次 |
| 最近记录: |