使用 Spring Boot 创建 Kafka 主题

Rit*_*h S 3 apache-kafka spring-boot spring-kafka

我正在尝试使用 Spring Boot 在 Kafka 中创建一个新主题。

在谷歌搜索后,我得到了以下答案

@Configuration
public class KafkaTopicConfiguration {
  
  @Bean
  public NewTopic createTopic(Topic topic) {
    return TopicBuilder.name(topic.getTopicName())
      .partitions(topic.getPartitions())
      .replicas(topic.getReplicas())
      .build();
  }
}
Run Code Online (Sandbox Code Playgroud)

但我无法使用 Spring Boot REST POST 调用来实现以下内容。

    @PostMapping("/api/v1/kafkatopic/")
    public NewTopic createTopic(@RequestBody Topic topic)
    {
        return kt.createTopic(topic);
    }

//kt being object of the Configuration Class

Run Code Online (Sandbox Code Playgroud)

但是,如果主题详细信息已通过代码(硬编码)传递,则相同的代码可以工作,如下所示。

  @Bean
  public NewTopic createTopic() {
    return TopicBuilder.name("test-topic")
      .partitions(6)
      .replicas(3)
      .build();
  }
Run Code Online (Sandbox Code Playgroud)

有人可以帮忙吗?

提前致谢

Gar*_*ell 6

NewTopicbean 仅在应用程序初始化期间创建。在运行时调用 bean 工厂方法不会执行任何操作。

要动态创建主题,您需要使用AdminClient. Spring Boot 自动配置KafkaAdminbean。

您可以AdminClient使用其属性创建一个。

然后使用客户端创建您的主题。

try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties()) {
    ...
}
Run Code Online (Sandbox Code Playgroud)