Spring-kafka 监听器并发

Mee*_*ack 4 java spring multithreading spring-kafka

我已经使用spring-kafka lib实现了 Kafka 消费者。我有一个带有 2 个分区的 Kafka 主题,并且我将ConcurrentKafkaListenerContainerFactory并发级别设置为 2,因此每个容器实例都应该根据 spring-kafka文档从单个分区中使用。

KafkaMessageListenerContainer 接收来自单个线程上所有主题/分区的所有消息。ConcurrentMessageListenerContainer 委托给 1 个或多个 KafkaMessageListenerContainer 以提供多线程消费。

有我的消费者类:

@Component
public class KafkaConsumer {
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
        String message = record.value().toString();
        Event event = EventFactory.createEvent(message);
        String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
        // add event to hashMap
        LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
        if (queue == null) {
            queue = new LinkedBlockingQueue<>();
            queue.add(event);
            hashMap.put(customerId, queue);
        } else {
            queue.add(event);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如您所见,我有 'hashMap' 集合,我根据消息 'customer_id' 属性将我的事件放入相应的队列中。在多线程访问的情况下,此类功能需要额外的同步,正如我所见,spring-kafka 仅为所有容器创建一个 bean 实例,而不是为每个容器创建一个单独的 bean 实例,以避免并发问题。

如何以编程方式更改此逻辑?

我看到解决这个问题的唯一奇怪的方法是使用两个 JVM 运行一个单独的应用程序,其中包含单线程消费者,因此使用 #receive 方法访问 KafkaConsumer 类将是单线程的。

Art*_*lan 6

没错。它是如何工作的。框架实际上并不依赖于 bean,而是依赖于它向函数传递消息的方法。

您可以考虑为@KafkaListener主题中的每个分区使用两种方法。确实,来自一个分区的记录@KafkaListener在单个线程中传递到。所以,如果你真的不能忍受这种状态,你可以HashMap为每个线程使用两个。

侦听器抽象背后的一般思想正是关于无状态行为。那KafkaConsumer是常规的 Spring单例bean。您必须接受这个事实并根据这种情况重新设计您的解决方案。