Spring Kafka 批量消费者的非阻塞重试

Mak*_*lov 3 java spring apache-kafka spring-boot spring-kafka

我正在使用 spring-kafka 2.8.0,我正在尝试为批量 kafka Consumer实现非阻塞重试。这是我的配置和消费者:

@Configuration
public class KafkaConfig {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> 
            batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        return factory;
    }
}
Run Code Online (Sandbox Code Playgroud)
@Component
public class MyConsumer {

    @KafkaListener(
            topics = "my-topic",
            containerFactory = "batchListenerFactory"
    )
    @RetryableTopic(
            backoff = @Backoff(delay = 1000, multiplier = 2.0),
            attempts = "4",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
            autoCreateTopics = "false"
    )
    public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
        // do some stuff
    }

}
Run Code Online (Sandbox Code Playgroud)

但在 Stutup 上我遇到以下异常: java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener

我的问题是:

  1. 有什么办法可以将批量消费者与 结合起来吗@RetryableTopic

  2. 有没有其他方法可以为批量消费者实现非阻塞重试?是否可以用于RetryTemplate此目的?

Gar*_*ell 6

@RetryableTopic批处理侦听器不支持。

(适用于 2.8 及更高版本RecoveringBatchErrorHandlerDefaultErrorHandler支持将批次中的失败记录发送到死信主题,并在侦听器的帮助下抛出指示BatchListenerFailedException哪个记录失败的信息。

然后,您必须针对该主题实现您自己的侦听器。