Rob*_*rtz 1 java spring apache-kafka spring-boot spring-kafka
如何从与不同代理关联的多个 Kafka 主题中进行消费?
\n我有一个 Spring Boot 应用程序,需要使用 2 个主题,但这些主题与不同的代理相关联。
\n我使用 Spring Kafka 和 @Listener 注释,我发现有一些方法可以使用与同一代理而不是不同代理关联的 2 个主题。不幸的是,我在 Spring Boot 或 Spring Kafka 文档中看不到任何关于如何执行此操作的有用信息。
\n有几种方法可以做到这一点,不幸的是 Spring-Boot 和 Spring-Kafka 没有明确实现这一点的最佳实践。SO 中还有很多答案可以解决使用同一代理从多个主题进行消费的问题,但它并不总是那么简单。
\n解决此问题的最简单方法是在Kafka Listener 注释中添加属性参数:
\n@KafkaListener(topics = ["\\${topic-1-name}"], properties = ["bootstrap.servers=\\${bootstrap-server-1}"])\nfun topic1Listener(@Payload messages: List<String>, ack: Acknowledgment){\n // Do work\n}\n\n@KafkaListener(topics = ["\\${topic-2-name}"], properties = ["bootstrap.servers=\\${bootstrap-server-2}"])\nfun topic2Listener(@Payload messages: List<String>, ack: Acknowledgment){\n // Do work\n}\nRun Code Online (Sandbox Code Playgroud)\n我们在属性参数值中指定的任何键/值对都将覆盖 DefaultKafkaConsumerFactory 中的默认键/值。在这种情况下,我们将 bootstrap.servers 属性覆盖为每个主题的我们自己的特定引导服务器地址。
\n但是,我们仍然可以使用 \xe2\x80\x9cnice 的 Spring-Boot 功能来拥有 \xe2\x80\x9d,例如自动主题创建和允许 Spring-Boot 为我们的应用程序设置组 ID。我们只需将 group-id 参数保留在 application.properties 或 application.yml 文件中即可。
\nspring:\n kafka:\n consumer:\n group-id: group-id-of-your-choice\nRun Code Online (Sandbox Code Playgroud)\n还有很多其他方法可以实现此目的,但这是我发现并测试过的最简单的方法。
\n其他方法包括创建您自己的自定义 ConsumerFactory 和 KafkaListenerContainerFactory 对象,然后配置每个 Factory 中的属性以使用您选择的引导服务器。然而,第一种方法更干净、更简单,使用默认的容器工厂。以下是如何使用您自己的属性创建自定义工厂。
\n@Bean\nfun ConsumerFactory1(): DefaultKafkaConsumerFactory<String, String> {\n val props = mutableMapOf<String, Any>()\n props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers1!!\n props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!\n props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n return DefaultKafkaConsumerFactory(props)\n}\n\n@Bean\nfun ContainerFactory1(): ConcurrentKafkaListenerContainerFactory<String, String>? {\n val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()\n factory.consumerFactory = ConsumerFactory1()\n factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL\n factory.isBatchListener = true\n return factory\n}\n\n@Bean\nfun ConsumerFactory2(): DefaultKafkaConsumerFactory<Any?, Any?> {\n val props = mutableMapOf<String, Any>()\n props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers2!!\n props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!\n props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n\n}\n\n@Bean\nfun ContainerFactory2(): ConcurrentKafkaListenerContainerFactory<String, String> {\n val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()\n factory.consumerFactory = ConsumerFactory2()\n factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL\n factory.isBatchListener = true\n return factory\n}\nRun Code Online (Sandbox Code Playgroud)\n这里有一些需要解压的东西。
\n@KafkaListener(topics = ["\\${kafka-topic-1}"], containerFactory = "ContainerFactory1", groupId = "\\${kafka.group-id}")\nRun Code Online (Sandbox Code Playgroud)\n还有更多方法可以实现这一点,我知道其他人也提出了很好的解决方案,有时这完全取决于您的应用程序需要什么。这只是我发现的成功解决方案中的 2 个,方法 1 很容易理解、实现和测试,无需过多涉及 Spring-Boot 和 Spring-Kafka 的深度!如果您需要这样的功能,这些方法将适用于 2 个以上的代理。
\n