CommitFailedException由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交

Sim*_* Su 20 java apache-kafka

我正在使用kafka 0.10.2,现在遇到了CommitFailedException.喜欢:

由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交.这意味着后续调用poll()的时间长于配置的max.poll.interval.ms,这通常意味着轮询循环花费了太多时间进行消息处理.您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题.

我已将max.poll.interval.ms设置为Integer.MAX_VALUE.所以任何人都可以告诉我为什么即使我设定了价值仍然会发生这种情况?

另一个问题是:我做的描述是将session.timeout.ms设置为60000并且它仍然会发生.我尝试通过简单的代码重现

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
Run Code Online (Sandbox Code Playgroud)

当我将session.timeout.ms设置为10000时,我尝试在我的轮询循环中睡眠超过10000毫秒,但它似乎工作,没有异常.所以我对此感到困惑.如果heartbeat是由consumer.poll和consumer.commit触发的,那么我的代码中的心跳超出会话超时.为什么不抛出CommitFailedException?

小智 17

session.timeout.ms消费者上的设置应该小于group.max.session.timeout.msKafka 代理上的设置。

这为我解决了这个问题。

归功于 github 链接提交失败

  • 也为我工作!group.max.session.timeout.ms 的默认值为 30000,我将 session.timeout.ms 设置为 25000。 (2认同)

Abh*_*nyu 5

您好为此,您需要处理代码中的重新平衡条件,并且应该处理正在进行的消息并在重新平衡之前提交它

喜欢 :

private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Implement what you want to do once rebalancing is done.
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

           // commit current method

       }
    }
Run Code Online (Sandbox Code Playgroud)

并使用以下语法订阅该主题:

kafkaConsumer.subscribe(topicNameList,新的HandleRebalance())

这样做的好处:

  1. 重新平衡时,消息不会重复。

  2. 没有提交失败异常导致的异常