我有一个 kafka 消费者 java 应用程序。我正在阅读该应用程序中的两个单独的卡夫卡主题。
对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。该消息采用 avro 格式。Pojo1 是使用 avro 模式构建的 pojo 类。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
为了消费来自 topic1 的消息,我们有以下方法
@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }
现在这个应用程序也从 topic2 读取消息,它也有 avro 消息,而 Pojo2 是使用相应 avro 模式构建的 pojo 类。
要读取 topic2,我们有以下方法
@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }
现在我不知道从 topic2 消费消息是如何按预期工作的。
kafkaListenerContainerFactory 配置了 Pojo1,那么如何从 Pojo2 的 topic2 读取消息。据我所知,kafka 仅在缺少名称为“kafkaListenerContainerFactory”的 bean 时才会创建默认容器工厂,但在我的应用程序中,我们已经为 topic1 构建了一个 kafkaListenerContainerFactory。
根据,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用时应该给出参考 …