use*_*872 5 apache-kafka kafka-producer-api
我有一个kafka客户端代码连接到Kafka(服务器0.10.1和客户端是0.10.2)代理.代码中有2个主题和2个不同的消费者组,还有一个生产者.偶尔从生产者代码中获取NetworkException(一次在2天内,一次在5天内,......).我们看到消费者组(Re)在日志中加入了消费者组的信息,然后是生产者future.get()调用中的NetworkException.不知道为什么我们会收到此错误.
代码: -
final Future<RecordMetadata> futureResponse =
producer.send(new ProducerRecord<>("ping_topic", "ping"));
futureResponse.get();
Run Code Online (Sandbox Code Playgroud)
例外: -
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
Run Code Online (Sandbox Code Playgroud)
NetworkException的Kafka API定义,
"在发出请求时发生了与网络相关的错误IOException.这可能是因为客户端的元数据已经过时,它正在向一个现在已经死亡的节点发出请求."
谢谢
我在测试 Kafka Consumer 时遇到了同样的错误。我使用的是发件人模板。在消费者配置中,我另外设置了以下属性:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
Run Code Online (Sandbox Code Playgroud)
发送消息后,我添加了一个线程睡眠:
ListenableFuture<SendResult<String, String>> future =
senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);
Thread.Sleep(10000).
Run Code Online (Sandbox Code Playgroud)
有必要进行测试,但可能不适合您的情况。
| 归档时间: |
|
| 查看次数: |
1429 次 |
| 最近记录: |