A_M*_*A_M 5 java spring apache-kafka spring-kafka
我想使用Spring Kafka API实现有状态的侦听器。
给定以下内容:
然后将创建“ n”个KafkaMessageListenerContainers。其中的每一个都有自己的KafkaConsumer,因此会有“ n”个消费者线程-每个消费者一个。
使用消息时,将使用轮询基础KafkaConsumer的同一线程来调用@KafkaListener方法。由于仅存在侦听器的实例,因此此侦听器必须是线程安全的,因为将有来自“ n”个线程的并发访问。
我不想考虑并发访问,而将状态保持在一个我只知道只能被一个线程访问的侦听器中。
如何使用Spring Kafka API为每个Kafka使用者创建一个单独的侦听器?
你是对的; 每个容器有一个侦听器实例(无论是否配置为 a@KafkaListener或MessageListener)。
一种解决方法是使用MessageListener具有 n 个KafkaMessageListenerContainerbean 范围的原型(每个 bean 有 1 个线程)。
然后,每个容器将获得自己的侦听器实例。
对于 POJO 抽象来说这是不可能的@KafkaListener。
不过,通常最好使用无状态 Bean。
编辑
我找到了另一种解决方法,使用SimpleThreadScope...
@SpringBootApplication
public class So51658210Application {
public static void main(String[] args) {
SpringApplication.run(So51658210Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
};
}
@Bean
public ActualListener actualListener() {
return new ActualListener();
}
@Bean
@Scope("threadScope")
public ThreadScopedListener listener() {
return new ThreadScopedListener();
}
@Bean
public static CustomScopeConfigurer scoper() {
CustomScopeConfigurer configurer = new CustomScopeConfigurer();
configurer.addScope("threadScope", new SimpleThreadScope());
return configurer;
}
@Bean
public NewTopic topic() {
return new NewTopic("so51658210", 3, (short) 1);
}
public static class ActualListener {
@Autowired
private ObjectFactory<ThreadScopedListener> listener;
@KafkaListener(id = "foo", topics = "so51658210")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
this.listener.getObject().doListen(in, partition);
}
}
public static class ThreadScopedListener {
private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + ":"
+ Thread.currentThread().getName() + ":"
+ this.hashCode() + ":"
+ partition);
}
}
}
Run Code Online (Sandbox Code Playgroud)
(容器并发数为3)。
它工作正常:
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
Run Code Online (Sandbox Code Playgroud)
唯一的问题是作用域不会自行清理(例如,当容器停止并且线程消失时)。这可能并不重要,具体取决于您的用例。
为了解决这个问题,我们需要容器的一些帮助(例如,当侦听器线程停止时在侦听器线程上发布一个事件)。GH-762。
| 归档时间: |
|
| 查看次数: |
763 次 |
| 最近记录: |