Mak*_*lov 3 java spring apache-kafka spring-boot spring-kafka
我正在使用 spring-kafka 2.8.0,我正在尝试为批量 kafka Consumer实现非阻塞重试。这是我的配置和消费者:
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>>
batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
@Component
public class MyConsumer {
@KafkaListener(
topics = "my-topic",
containerFactory = "batchListenerFactory"
)
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
attempts = "4",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
autoCreateTopics = "false"
)
public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
// do some stuff
}
}
Run Code Online (Sandbox Code Playgroud)
但在 Stutup 上我遇到以下异常:
java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener
我的问题是:
有什么办法可以将批量消费者与 结合起来吗@RetryableTopic?
有没有其他方法可以为批量消费者实现非阻塞重试?是否可以用于RetryTemplate此目的?
@RetryableTopic批处理侦听器不支持。
(适用于 2.8 及更高版本RecoveringBatchErrorHandler)DefaultErrorHandler支持将批次中的失败记录发送到死信主题,并在侦听器的帮助下抛出指示BatchListenerFailedException哪个记录失败的信息。
然后,您必须针对该主题实现您自己的侦听器。
| 归档时间: |
|
| 查看次数: |
1571 次 |
| 最近记录: |