kafka偏移提交失败org.apache.kafka.clients.consumer.CommitFailedException

hit*_*esh 6 java apache-kafka spring-kafka

我使用 spring-kafka 库(spring-boot-starter-parent 2.3.4.RELEASE)编写了一个 kafka 消费者。我的代码中有以下消费者配置

 /**
     * configuration for kafka consumers at container level
     *
     * @return
     */
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);     factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    /**
     * default kafka consumer factory
     *
     * @return
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * configuration for kafka consumer at thread level.
     *
     * @return
     */
    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GenericDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }
Run Code Online (Sandbox Code Playgroud)

以下是我的侦听器方法:

@KafkaListener(id = "client", topics = "MyTopic", clientIdPrefix = "client")
    public void listen(@Payload UserNotification data,Acknowledgment ack) {

        // Business logic

        ack.acknowledge();
    }
Run Code Online (Sandbox Code Playgroud)

在这里,我一次读取 1 条消息,应用业务逻辑并使用 ack.acknowledge() 提交偏移量,但我所看到的,有时偏移量提交成功,但很多时候我得到 org.apache.kafka.clients.consumer.CommitFailedException在线 ack.acknowledge()。在这里我可以确认业务逻辑最多在 2 秒内完成。以下是详细的异常情况:

2022-06-03|04:27:04.326|INSTANCEID_IS_UNDEFINED|xyz-856495f857-8nqx7|client-0-C-1|ERROR||||o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|149|Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.xyz.listen(java.lang.String,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2012)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1838)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1735)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1465)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1128)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

有人可以帮助我为什么会发生这种情况,因为默认轮询间隔是 5 分钟,如果处理只需要 2 秒,组协调员不应该踢出消费者。

Gar*_*ell 10

您必须能够max.poll.records在(默认 300000 - 5 分钟)内处理(默认 500)max.poll.interval.ms

如果每条记录需要 2 秒,则处理该批次最多需要 16.6667 分钟,并且您将被踢出组。

减少max.poll.records和/或增加max.poll.interval.ms