Hug*_*ona 8 java apache-kafka kafka-consumer-api
在创建多个使用者(使用Kafka 0.9 java API)并且每个线程启动后,我得到以下异常
Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)
Run Code Online (Sandbox Code Playgroud)
然后开始正常消费消息,我想知道导致此异常的原因是为了解决它.
ajk*_*ret 11
还要尝试调整以下参数:
我注意到,如果消费者在心跳超时之前没有提交Kafka,则会发生重新平衡.如果在处理消息后发生提交,则处理它们的时间将决定这些参数.因此,减少消息数量和增加心跳时间将有助于避免重新平衡.
还要考虑使用更多分区,因此将有更多线程处理您的数据,即使每次轮询的消息更少.
我写了这个小应用程序来进行测试.希望能帮助到你.
https://github.com/ajkret/kafka-sample
UPDATE
Kafka 0.10.x现在提供了一个新参数来控制收到的消息数量: - max.poll.records - 一次调用poll()时返回的最大记录数.
UPDATE
Kafka提供了一种暂停队列的方法.当队列暂停,你可以处理一个独立的线程中的消息,让你打电话KafkaConsumer.poll()发送心跳.然后在处理完成后调用KafkaConsumer.resume().这样可以缓解因不发送心跳而导致重新平衡的问题.以下是您可以执行的操作的概述:
while(true) {
ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
consumer.commitSync();
consumer.pause();
for(ConsumerRecord record: records) {
Future<Boolean> future = workers.submit(() -> {
// Process
return true;
});
while (true) {
try {
if (future.get(1, TimeUnit.SECONDS) != null) {
break;
}
} catch (java.util.concurrent.TimeoutException e) {
getConsumer().poll(0);
}
}
}
consumer.resume();
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10065 次 |
| 最近记录: |