启用 @KafkaListener 从 application.yml 文件中获取可变主题名称

ter*_*abl 8 spring-boot spring-kafka

我试图将多个主题加载到一个主题中,@KafkaListener但遇到了麻烦,因为我相信它正在寻找一个常量值,但是topicsapplication.yml文件中初始化变量会导致一些问题,我想知道是否有人可以帮助我解决这个问题,或者指导我如何将多个 Kafka 主题加载到单个 KafkaListener 中。

@KafkaListener通过将它们传递到逗号分隔的对象中,我可以收听相同的多个主题,如下所示:

@KafkaListener(topics = {
           "flight-events",
           "flight-time-events",
           "service-events",
           "flight-delay-events"
   })
Run Code Online (Sandbox Code Playgroud)

我意识到我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。

我相信可能存在的问题是@KafkaListener 需要接受一个常量值,而我无法将注释定义为常量,有什么办法可以解决这个问题吗?

KafkaWebSocketConnector.java

@Component
public class KafkaWebSocketConnector
{


   @Value("${spring.kafka.topics}")
   private String[] topics;

   @KafkaListener(topics = topics)
   public void listen(ConsumerRecord<?, Map<String, String>> message)
   {
      log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
      String dest = "/" + message.topic();
      log.info("destination = {}", dest);
      log.info("msg: {}", message);
      messageTemplate.convertAndSend(dest, message.value());
   }
}
Run Code Online (Sandbox Code Playgroud)

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: kafka-websocket-connector
    topics: flight-events,
      flight-time-events,
      canceled-events,
      pax-events,
      flight-delay-events
Run Code Online (Sandbox Code Playgroud)

ter*_*abl 19

@Gary Russell 从这个 GitHub 问题提供的答案:

https://github.com/spring-projects/spring-kafka/issues/361

您可以使用 SpEL 表达式;EnableKafkaIntegrationTests 中有一个示例...

@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
Run Code Online (Sandbox Code Playgroud)

就我而言 "#{'${spring.kafka.topics}'.split(',')}"

为了回答上述问题,我能够实现上述代码(由 Gary Russell 提供)。

  • 如果您不需要多个主题监听器,则表达式可能会更简单:`@KafkaListener(id = "myListener",topics = "${topics.myTopicName}"` (2认同)