Mee*_*ack 4 java spring multithreading spring-kafka
我已经使用spring-kafka lib实现了 Kafka 消费者。我有一个带有 2 个分区的 Kafka 主题,并且我将ConcurrentKafkaListenerContainerFactory并发级别设置为 2,因此每个容器实例都应该根据 spring-kafka文档从单个分区中使用。
KafkaMessageListenerContainer 接收来自单个线程上所有主题/分区的所有消息。ConcurrentMessageListenerContainer 委托给 1 个或多个 KafkaMessageListenerContainer 以提供多线程消费。
有我的消费者类:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,我有 'hashMap' 集合,我根据消息 'customer_id' 属性将我的事件放入相应的队列中。在多线程访问的情况下,此类功能需要额外的同步,正如我所见,spring-kafka 仅为所有容器创建一个 bean 实例,而不是为每个容器创建一个单独的 bean 实例,以避免并发问题。
如何以编程方式更改此逻辑?
我看到解决这个问题的唯一奇怪的方法是使用两个 JVM 运行一个单独的应用程序,其中包含单线程消费者,因此使用 #receive 方法访问 KafkaConsumer 类将是单线程的。
没错。它是如何工作的。框架实际上并不依赖于 bean,而是依赖于它向函数传递消息的方法。
您可以考虑为@KafkaListener主题中的每个分区使用两种方法。确实,来自一个分区的记录@KafkaListener在单个线程中传递到。所以,如果你真的不能忍受这种状态,你可以HashMap为每个线程使用两个。
侦听器抽象背后的一般思想正是关于无状态行为。那KafkaConsumer是常规的 Spring单例bean。您必须接受这个事实并根据这种情况重新设计您的解决方案。
| 归档时间: |
|
| 查看次数: |
5181 次 |
| 最近记录: |