Kafka CommitFailedException消费者异常

Hug*_*ona 8 java apache-kafka kafka-consumer-api

在创建多个使用者(使用Kafka 0.9 java API)并且每个线程启动后,我得到以下异常

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)
Run Code Online (Sandbox Code Playgroud)

然后开始正常消费消息,我想知道导致此异常的原因是为了解决它.

ajk*_*ret 11

还要尝试调整以下参数:

  • heartbeat.interval.ms - 这告诉Kafka在考虑将消费者视为"死亡"之前等待指定的毫秒数
  • max.partition.fetch.bytes - 这将限制消费者在轮询时将收到的消息量(最多).

我注意到,如果消费者在心跳超时之前没有提交Kafka,则会发生重新平衡.如果在处理消息后发生提交,则处理它们的时间将决定这些参数.因此,减少消息数量和增加心跳时间将有助于避免重新平衡.

还要考虑使用更多分区,因此将有更多线程处理您的数据,即使每次轮询的消息更少.

我写了这个小应用程序来进行测试.希望能帮助到你.

https://github.com/ajkret/kafka-sample

UPDATE

Kafka 0.10.x现在提供了一个新参数来控制收到的消息数量: - max.poll.records - 一次调用poll()时返回的最大记录数.

UPDATE

Kafka提供了一种暂停队列的方法.当队列暂停,你可以处理一个独立的线程中的消息,让你打电话KafkaConsumer.poll()发送心跳.然后在处理完成后调用KafkaConsumer.resume().这样可以缓解因不发送心跳而导致重新平衡的问题.以下是您可以执行的操作的概述:

while(true) {
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
    consumer.commitSync();

    consumer.pause();
    for(ConsumerRecord record: records) {

        Future<Boolean> future = workers.submit(() -> {
            // Process
            return true;
        }); 


       while (true) {
            try {
                if (future.get(1, TimeUnit.SECONDS) != null) {
                    break;
                }
            } catch (java.util.concurrent.TimeoutException e) {
                getConsumer().poll(0);
            }
        }
    }

    consumer.resume();
}
Run Code Online (Sandbox Code Playgroud)