Spring Cloud Stream Function :通过 REST 调用调用 Function<T,R> 并将其输出到 KAFKA 主题

Ang*_*wal 2 java java-8 spring-cloud-stream spring-cloud-function

我有简单的@Bean(Java 8 函数),它们映射到目的地topic-out-in)。

@Bean
public Function<String, String> transform() {
    return payload -> payload.toUpperCase();
}

@Bean
public Consumer<String> receive() {
    return payload -> logger.info("Data received: " + payload);
}
Run Code Online (Sandbox Code Playgroud)

.yml配置:

spring:
  cloud:
    stream:
      function:
        definition: transform;receive
      bindings:
        transform-out-0:
          destination: myTopic
        receive-in-0:
          destination: myTopic

Run Code Online (Sandbox Code Playgroud)

现在,我想transform通过调用来调用该函数REST,以便它的输出转到destination topic(即transform-out-0映射到)并由该目的地(映射到)myTopic拾取。基本上,每个 REST 调用都应该生成一个新的KAFKA实例并关闭它。consumerreceive-in-0myTopic Producer

我怎样才能做到这一点,请使用spring-cloud-stream

谢谢

安舒曼

sob*_*cko 7

您应该使用StreamBridge而不是拥有该transform功能。这是 Spring Cloud Stream 中动态目标的新推荐方法。这是基本思想:

@Autowired
private StreamBridge streamBridge;

@RequestMapping
public void delegateToSupplier(@RequestBody String body) {
    streamBridge.send("transform-out-0", body);
}
Run Code Online (Sandbox Code Playgroud)

然后通过配置提供此属性 - spring.cloud.stream.source: transform

transform-out-0Spring Cloud Stream 将创建一个需要您调用的输出绑定。每次调用 REST 端点时,通过StreamBridge,您都会将数据发送到目标主题。

有关更多信息,请参阅