Apache Kafka客户端何时抛出"批量过期"异常?

Jam*_*mas 36 java apache-kafka message-hub ibm-cloud

使用Apache Kafka Java客户端(0.9),我正在尝试使用Kafka Producer类向代理发送一长串记录.

异步发送方法立即返回一段时间,然后在短时间内开始阻塞每次调用.大约三十秒后,客户端开始抛出异常(TimeoutException),并显示消息"Batch expired".

什么情况导致抛出此异常?

Jam*_*mas 47

此异常表示您以比发送速度更快的速率排队记录.

当您调用send方法时,ProducerRecord将存储在内部缓冲区中以发送给代理.一旦ProducerRecord被缓冲,该方法立即返回,无论它是否已被发送.

记录被分组为批次以发送给代理,以减少每条消息的传输被窃听并增加吞吐量.

添加批处理记录后,发送该批处理有一个时间限制,以确保它在指定的持续时间内发送.这由Producer配置参数request.timeout.ms控制,默认为30秒.

如果批处理的排队时间超过超时限制,则将引发异常.该批次中的记录将从发送队列中删除.

使用配置参数增加超时限制将允许客户端在到期之前将批次排队更长时间.


Rob*_*rto 31

我在一个完全不同的背景下得到了这个例外.

我已经设置了一个由zookeeper vm,一个代理vm和一个生产者/消费者vm组成的迷你集群.我打开了服务器(9092)和zookeeper(2181)上的所有必要端口,然后尝试将消费者/发布者vm的消息发布到代理.我得到了OP提到的异常,但由于到目前为止我只发布了一条消息(或者至少我试过),解决方案无法增加超时或批量大小.所以我搜索并发现这个邮件列表描述了我在尝试使用消费者/生产者vm(ClosedChannelException)中的消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble -with-the-simplest-remote-kafka-config 此邮件列表中的最后一篇文章实际上描述了如何解决问题.

简而言之,如果您同时面对ChannelClosedExceptionBatch Expired例外,您可能必须将此行更改为server.config文件中的以下内容并重新启动代理:

advertised.host.name=<broker public IP address>
Run Code Online (Sandbox Code Playgroud)

如果没有设置,它将回退到host.name属性(可能既没有设置),然后又回到InetAddressJava类的规范主机名,当然最终不正确,从而混淆了远程节点.