Ans*_*wal 5 java apache-kafka spring-kafka
我有一个 kafka 消费者 java 应用程序。我正在阅读该应用程序中的两个单独的卡夫卡主题。
对于主题1,有一个名为kafkaListenerContainerFactory的KafkaListenerContainerFactory,下面是代码片段。该消息采用 avro 格式。Pojo1 是使用 avro 模式构建的 pojo 类。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
为了消费来自 topic1 的消息,我们有以下方法
@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }
现在这个应用程序也从 topic2 读取消息,它也有 avro 消息,而 Pojo2 是使用相应 avro 模式构建的 pojo 类。
要读取 topic2,我们有以下方法
@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }
现在我不知道从 topic2 消费消息是如何按预期工作的。
kafkaListenerContainerFactory 配置了 Pojo1,那么如何从 Pojo2 的 topic2 读取消息。据我所知,kafka 仅在缺少名称为“kafkaListenerContainerFactory”的 bean 时才会创建默认容器工厂,但在我的应用程序中,我们已经为 topic1 构建了一个 kafkaListenerContainerFactory。
根据,应该为Pojo2创建另一个KafkaListenerContainerFactory,并且在使用时应该给出参考@kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
有人可以帮助我了解 KafkaListenerContainerFactory 如何处理具有不同消息模式的不同主题。
这称为类型擦除;泛型类型仅在编译时适用。在运行时,只要您的反序列化器创建正确的类型,它就可以工作。
使用起来可能更干净,ConcurrentKafkaListenerContainerFactory<String, Object>
以避免任何混淆;特别是当使用可以创建不同类型的 avro 或 json 反序列化器时。
也就是说,你的 bean 被称为importDecisionMessageKafkaListenerContainerFactory
. 对于使用非标准工厂的侦听器,必须在containerFactory
侦听器的属性中指定其名称;否则kafkaListenerContainerFactory
使用。
归档时间: |
|
查看次数: |
4987 次 |
最近记录: |