小编yfl*_*yfl的帖子

Kafka消费者异常和抵消提交

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误方面的最佳实践.

我想知道是否有人能够提供帮助:

  1. 分享围绕Kafka消费者在发生故障时应该做的最佳实践
  2. 帮助我了解AckMode Record如何工作,以及如何在侦听器方法中抛出异常时阻止对Kafka偏移队列的提交.

2的代码示例如下:

鉴于AckMode设置为RECORD,根据文档:

处理记录后,侦听​​器返回时提交偏移量.

我认为如果监听器方法抛出异常,偏移量不会增加.但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此.偏移量仍会更新,并继续处理下一条消息.

我的配置:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我的代码:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
6823
查看次数

Spring Kafka:ApplicationContext中不同对象的多个侦听器

我可以请与社区核实听取多个主题的最佳方式是什么,每个主题包含不同类的消息?

在过去的几天里,我一直在玩Spring Kafka.到目前为止我的思考过程:

  • 因为在初始化KafkaListenerContainerFactory时需要将反序列化器传递给DefaultKafkaConsumerFactory.这似乎表明,如果我需要多个容器,每个反序列化一个不同类型的消息,我将无法使用@EnableKafka和@KafkaListener注释.

  • 这使我认为这样做的唯一方法是实例化多个KafkaMessageListenerContainer.

  • 鉴于KafkaMessageListenerContainers是单线程的,我需要同时监听多个主题,我真的应该使用多个ConcurrentKafkaMessageListenerContainers.

我会在这里走上正轨吗?有一个更好的方法吗?

谢谢!

java apache-kafka spring-kafka

8
推荐指数
3
解决办法
9528
查看次数