小编Ans*_*wal的帖子

默认的kafkaListenerContainerFactory是如何工作的

我有一个 kafka 消费者 java 应用程序。我正在阅读该应用程序中的两个单独的卡夫卡主题。

对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。该消息采用 avro 格式。Pojo1 是使用 avro 模式构建的 pojo 类。

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

为了消费来自 topic1 的消息,我们有以下方法

@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }

现在这个应用程序也从 topic2 读取消息,它也有 avro 消息,而 Pojo2 是使用相应 avro 模式构建的 pojo 类。

要读取 topic2,我们有以下方法

@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }

现在我不知道从 topic2 消费消息是如何按预期工作的。

kafkaListenerContainerFactory 配置了 Pojo1,那么如何从 Pojo2 的 topic2 读取消息。据我所知,kafka 仅在缺少名称为“kafkaListenerContainerFactory”的 bean 时才会创建默认容器工厂,但在我的应用程序中,我们已经为 topic1 构建了一个 kafkaListenerContainerFactory。

根据,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用时应该给出参考 …

java apache-kafka spring-kafka

5
推荐指数
1
解决办法
4987
查看次数

标签 统计

apache-kafka ×1

java ×1

spring-kafka ×1