如何从与不同代理关联的多个 Kafka 主题中进行消费?

Rob*_*rtz 1 java spring apache-kafka spring-boot spring-kafka

如何从与不同代理关联的多个 Kafka 主题中进行消费?

\n

我有一个 Spring Boot 应用程序,需要使用 2 个主题,但这些主题与不同的代理相关联。

\n

我使用 Spring Kafka 和 @Listener 注释,我发现有一些方法可以使用与同一代理而不是不同代理关联的 2 个主题。不幸的是,我在 Spring Boot 或 Spring Kafka 文档中看不到任何关于如何执行此操作的有用信息。

\n

Rob*_*rtz 6

有几种方法可以做到这一点,不幸的是 Spring-Boot 和 Spring-Kafka 没有明确实现这一点的最佳实践。SO 中还有很多答案可以解决使用同一代理从多个主题进行消费的问题,但它并不总是那么简单。

\n

方法一

\n

解决此问题的最简单方法是在Kafka Listener 注释中添加属性参数:

\n
@KafkaListener(topics = ["\\${topic-1-name}"], properties = ["bootstrap.servers=\\${bootstrap-server-1}"])\nfun topic1Listener(@Payload messages: List<String>, ack: Acknowledgment){\n    // Do work\n}\n\n@KafkaListener(topics = ["\\${topic-2-name}"], properties = ["bootstrap.servers=\\${bootstrap-server-2}"])\nfun topic2Listener(@Payload messages: List<String>, ack: Acknowledgment){\n    // Do work\n}\n
Run Code Online (Sandbox Code Playgroud)\n

我们在属性参数值中指定的任何键/值对都将覆盖 DefaultKafkaConsumerFactory 中的默认键/值。在这种情况下,我们将 bootstrap.servers 属性覆盖为每个主题的我们自己的特定引导服务器地址。

\n

但是,我们仍然可以使用 \xe2\x80\x9cnice 的 Spring-Boot 功能来拥有 \xe2\x80\x9d,例如自动主题创建和允许 Spring-Boot 为我们的应用程序设置组 ID。我们只需将 group-id 参数保留在 application.properties 或 application.yml 文件中即可。

\n
spring:\n  kafka:\n    consumer:\n      group-id: group-id-of-your-choice\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  • 请注意,我们可以为两个消费者使用相同的组 ID,即使它们可能跨越多个代理。实际上,为整个应用程序设置 1 个组 ID 是一种很好的做法,这样监控消费者延迟以及其他指标就变得很简单。
  • \n
  • 另请注意,我们不再将主题名称存储在 Spring 配置部分中,我们需要在其他地方执行此操作,因为我们不希望 Spring-Boot 使用不正确的代理地址配置我们的主题。当我们覆盖属性时,我们让监听器处理该部分,如上所示。
  • \n
\n

还有很多其他方法可以实现此目的,但这是我发现并测试过的最简单的方法。

\n

方法二

\n

其他方法包括创建您自己的自定义 ConsumerFactory 和 KafkaListenerContainerFactory 对象,然后配置每个 Factory 中的属性以使用您选择的引导服务器。然而,第一种方法更干净、更简单,使用默认的容器工厂。以下是如何使用您自己的属性创建自定义工厂。

\n
@Bean\nfun ConsumerFactory1(): DefaultKafkaConsumerFactory<String, String> {\n        val props = mutableMapOf<String, Any>()\n        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers1!!\n        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!\n        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n        return DefaultKafkaConsumerFactory(props)\n}\n\n@Bean\nfun ContainerFactory1(): ConcurrentKafkaListenerContainerFactory<String, String>? {\n        val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()\n        factory.consumerFactory = ConsumerFactory1()\n        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL\n        factory.isBatchListener = true\n        return factory\n}\n\n@Bean\nfun ConsumerFactory2(): DefaultKafkaConsumerFactory<Any?, Any?> {\n        val props = mutableMapOf<String, Any>()\n        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers2!!\n        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!\n        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java\n\n}\n\n@Bean\nfun ContainerFactory2(): ConcurrentKafkaListenerContainerFactory<String, String> {\n        val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()\n        factory.consumerFactory = ConsumerFactory2()\n        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL\n        factory.isBatchListener = true\n        return factory\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这里有一些需要解压的东西。

\n
    \n
  • 容器工厂本质上是相同的,我们只是使用各自的消费者工厂以及每个工厂的相关属性。
  • \n
  • 我使用内置的 StringDeserializer,因为我的应用程序将消息作为字符串使用,然后使用 Jackson 将 Json 字符串序列化为对象。您的应用程序可能需要不同的解串器,甚至需要自定义解串器,具体取决于该主题的数据序列化方式。
  • \n
  • 将 AckMode 设置为 MANUAL 允许我们在确认已消费来自主题的消息时进行控制。
  • \n
  • 将批处理侦听器设置为 true 允许我们的侦听器批量侦听消息,而不是一次侦听 1 个消息。
  • \n
  • 通过此实现,我们在使用 Kafka 方面完全将 Spring-Boot 从我们的应用程序中剥离出来。所以我们的 @Listener 注释看起来会有点不同:
  • \n
\n
@KafkaListener(topics = ["\\${kafka-topic-1}"], containerFactory = "ContainerFactory1", groupId = "\\${kafka.group-id}")\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  • 我们不再让 Spring-Boot 为我们配置 Group-Id,因此我们现在需要在监听器中指定它。这意味着您的 application.properties 文件中不再定义 Spring.Kafka.Consumer 属性,我们需要以编程方式执行此操作。我们现在需要手动配置一些其他东西,例如启动时自动配置主题,如果您需要该功能,则需要手动设置KafkaAdmin bean
  • \n
\n

结论

\n

还有更多方法可以实现这一点,我知道其他人也提出了很好的解决方案,有时这完全取决于您的应用程序需要什么。这只是我发现的成功解决方案中的 2 个,方法 1 很容易理解、实现和测试,无需过多涉及 Spring-Boot 和 Spring-Kafka 的深度!如果您需要这样的功能,这些方法将适用于 2 个以上的代理。

\n