获取错误:批处理包含3条记录因超时而过期,同时从代理请求test2R2P2-1的元数据

San*_*ngh 6 apache-kafka

我在运行生产者客户端时遇到错误,生成器客户端从输入文件中获取消息kafka_message.log.此日志文件每个消息的长度为4096,每秒有100000条记录

错误 -

[2017-01-09 14:45:24,813] ERROR Error when sending message to topic test2R2P2 with key: null, value: 4096 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test2R2P2-0
[2017-01-09 14:45:24,816] ERROR Error when sending message to topic test2R2P2 with key: null, value: 4096 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test2R2P2-0
[2017-01-09 14:45:24,816] ERROR Error when sending message to topic test2R2P2 with key: null, value: 4096 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test2R2P2-0
[2017-01-09 14:45:24,816] ERROR Error when sending message to topic test2R2P2 with key: null, value: 4096 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test2R2P2-0
[2017-01-09 14:45:24,816] ERROR Error when sending message to topic test2R2P2 with key: null, value: 4096 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test2R2P2-0
Run Code Online (Sandbox Code Playgroud)

命令我运行:

$ bin/kafka-console-producer.sh --broker-list x.x.x.x:xxxx,x.x.x.x:xxxx --batch-size 1000 --message-send-max-retries 10 --request-required-acks 1 --topic test2R2P2 <~/kafka_message.log
Run Code Online (Sandbox Code Playgroud)

有2个经纪人在运行,主题有partitions = 2replication factor = 2.

有人可以帮我理解这个错误的含义吗?我也看到消息丢失意味着并非所有来自输入文件的消息都被放入主题中?

单独注意:我在运行kafka-producer-perf-test.sh时看到数据丢失,并在测试运行时杀死其中一个代理(在3节点集群中).这是一种预期的行为吗?我看到多次测试的结果相同.

我运行的命令:

描述主题:

 $ bin/kafka-topics.sh  --zookeeper x.x.x.x:2181/kafka-framework --describe |grep test4
Topic:test4R2P2 PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: test4R2P2        Partition: 0    Leader: 0       Replicas: 1,0   Isr: 0,1
        Topic: test4R2P2        Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1
Run Code Online (Sandbox Code Playgroud)

运行性能测试:

$ bin/kafka-producer-perf-test.sh --num-records 100000 --record-size 4096  --throughput 1000  --topic test4R2P2 --producer-props bootstrap.servers=x.x.x.x:xxxx,x.x.x.x:xxxx
Run Code Online (Sandbox Code Playgroud)

消费者命令:

$ bin/kafka-console-consumer.sh --zookeeper x.x.x.x:2181/kafka-framework --topic test4R2P2 1>~/kafka_message.log
Run Code Online (Sandbox Code Playgroud)

检查邮件计数:

$ wc -l ~/kafka_message.log
399418 /home/montana/kafka_message.log
Run Code Online (Sandbox Code Playgroud)

我在主题test4R2P2中只看到了399418条消息,其中我通过运行4次测试来输出总共400000条消息.

perf命令抛出的异常:

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Run Code Online (Sandbox Code Playgroud)

消费者命令抛出的异常:

[2017-01-10 07:40:07,246] WARN [ConsumerFetcherThread-console-consumer-46599_node-44a8422fe1a0-1484033822261-f07d33d7-0-1], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@695be565 (kafka.consumer.ConsumerFetcherThread)
[2017-01-10 07:40:07,472] WARN Fetching topic metadata with correlation id 1 for topics [Set(test4R2P2)] from broker [BrokerEndPoint(1,10.105.26.1,31052)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
[2017-01-10 07:42:23,073] WARN [ConsumerFetcherThread-console-consumer-46599_node-44a8422fe1a0-1484033822261-f07d33d7-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@7bd94073 (kafka.consumer.ConsumerFetcherThread)
[2017-01-10 07:44:58,195] WARN [ConsumerFetcherThread-console-consumer-46599_node-44a8422fe1a0-1484033822261-f07d33d7-0-1], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@2855ee73 (kafka.consumer.ConsumerFetcherThread)
[2017-01-10 07:44:58,404] WARN Fetching topic metadata with correlation id 3 for topics [Set(test4R2P2)] from broker [BrokerEndPoint(1,10.105.26.1,31052)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
[2017-01-10 07:45:47,127] WARN [ConsumerFetcherThread-console-consumer-46599_node-44a8422fe1a0-1484033822261-f07d33d7-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@f8887da (kafka.consumer.ConsumerFetcherThread)
[2017-01-10 07:50:56,291] ERROR [ConsumerFetcherThread-console-consumer-46599_node-44a8422fe1a0-1484033822261-f07d33d7-0-1], Error for partition [test4R2P2,1] to broker 1:kafka.common.NotLeaderForPartitionException (kafka.consumer.ConsumerFetcherThread)
Run Code Online (Sandbox Code Playgroud)

Den*_*din 0

根据评论,@amethystic 的建议似乎有效:

...您可以增加“request.timeout.ms”的值...