Spring Cloud Stream Kafka发送消息

1 functional-programming spring-boot spring-kafka spring-cloud-stream-binder-kafka

如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?

已弃用的方式如下所示。

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}
Run Code Online (Sandbox Code Playgroud)

但是我怎样才能以功能风格发送消息呢?

应用程序.yml

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }
Run Code Online (Sandbox Code Playgroud)

我会自动装配 MessageChannel,但没有用于 process、process-out-0、输出或类似内容的 MessageChannel-Bean。或者我可以使用供应商 Bean 发送消息吗?有人可以给我举个例子吗?多谢!

Gar*_*ell 6

您可以使用StreamBridge或reactor API - 请参阅将任意数据发送到输出(例如外部事件驱动源)