如何在不同线程中处理@KafkaListener方法?

ip6*_*696 2 java apache-kafka spring-boot kafka-consumer-api spring-kafka

我在 Spring Boot 中有卡夫卡处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }
Run Code Online (Sandbox Code Playgroud)

例如,生产者每秒发送一条消息。但myService.processResponse工作10秒。我需要处理每条消息并myService.processResponse在新线程中开始。我可以创建我的执行者并将每个响应委托给它。但我认为 kafka 中还有其他配置可供使用。我找到了2个:

1)添加concurrency = "5"@KafkaListener注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方法:

2)我可以创建ConcurrentKafkaListenerContainerFactory并设置它ConsumerFactory并且concurrency

我不明白这些方法之间的区别?concurrency = "5"只需添加到注释就足够了@KafkaListener还是我需要创建ConcurrentKafkaListenerContainerFactory

或者我根本不明白什么,还有其他方法吗?

Gar*_*ell 7

在管理提交的偏移量方面,使用执行器会使事情变得复杂;不推荐。

使用@KafkaListener,框架会ConcurrentKafkaListenerContainerFactory为您创建一个。

concurrency上注解只是为了方便;它会覆盖出厂设置。

这允许您将同一个工厂与多个侦听器一起使用,每个侦听器具有不同的并发性。

您可以使用启动属性设置容器并发(默认);该值被注释值覆盖;请参阅java文档...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";
Run Code Online (Sandbox Code Playgroud)


yur*_*s87 5

concurrency选项与并发处理同一消费者收到的消息无关。当您有多个消费者并且每个消费者处理自己的分区时,它适用于消费者组。

我相信,将处理传递给单独的线程非常复杂,Spring-Kafka 团队决定不“按设计”这样做。您甚至不需要深入研究 Spring-Kafka 就可以理解其中的原因。检查KafkaConsumer 的 检测消费者故障文档:

必须注意确保承诺的偏移量不会超出实际位置。通常,您必须禁用自动提交,并仅在线程完成处理记录后手动提交记录的已处理偏移量(取决于您需要的传递语义)。另请注意,您需要暂停分区,以便在线程完成处理先前返回的记录之前不会从轮询中接收新记录。