默认的kafkaListenerContainerFactory是如何工作的

Ans*_*wal 5 java apache-kafka spring-kafka

我有一个 kafka 消费者 java 应用程序。我正在阅读该应用程序中的两个单独的卡夫卡主题。

对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。该消息采用 avro 格式。Pojo1 是使用 avro 模式构建的 pojo 类。

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

为了消费来自 topic1 的消息,我们有以下方法

@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }

现在这个应用程序也从 topic2 读取消息,它也有 avro 消息,而 Pojo2 是使用相应 avro 模式构建的 pojo 类。

要读取 topic2,我们有以下方法

@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }

现在我不知道从 topic2 消费消息是如何按预期工作的。

kafkaListenerContainerFactory 配置了 Pojo1,那么如何从 Pojo2 的 topic2 读取消息。据我所知,kafka 仅在缺少名称为“kafkaListenerContainerFactory”的 bean 时才会创建默认容器工厂,但在我的应用程序中,我们已经为 topic1 构建了一个 kafkaListenerContainerFactory。

根据,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用时应该给出参考@kafkaListener (topics="topic2", containerFactory="factoryForTopic2")

有人可以帮助我了解 KafkaListenerContainerFactory 如何处理具有不同消息模式的不同主题。

Gar*_*ell 6

这称为类型擦除;泛型类型仅在编译时适用。在运行时,只要您的反序列化器创建正确的类型,它就可以工作。

使用起来可能更干净,ConcurrentKafkaListenerContainerFactory<String, Object>以避免任何混淆;特别是当使用可以创建不同类型的 avro 或 json 反序列化器时。

也就是说,你的 bean 被称为importDecisionMessageKafkaListenerContainerFactory. 对于使用非标准工厂的侦听器,必须在containerFactory侦听器的属性中指定其名称;否则kafkaListenerContainerFactory使用。