如何以非常通用的方式被动地将有效负载发送到 Kafka 主题?

Ano*_*ala 3 reactive-programming project-reactor spring-cloud-stream spring-webflux

我们的代码是完全反应式的,使用 Webflux。我们想将消息发送到 Kafka 队列,并希望以被动方式进行。

我看到有一个 spring-cloud-stream-reactive 依赖项,它支持 Reactive 功能,但对于它的数据流的特定用例。

Spring Cloud Stream 还支持使用反应式 API,其中传入和传出数据作为连续数据流处理。

我知道这里有一种方法可以做到:https : //projectreactor.io/docs/kafka/release/reference/

但它看起来与 kafka 特定的库紧密耦合。我喜欢这里的松耦合。

有没有办法以我的代码不耦合到 kafka 库的方式来做到这一点?

Ole*_*sky 5

因此,在 spring-cloud-stream 2.x 中,我们逐渐弃用反应式模块,转而使用spring-cloud-function提供开箱即用的反应式支持。这是一个更简单且耦合更少的编程模型。这里有更多细节

基本上在上面链接的示例中,您只需要修改函数本身

@Bean
public Function<Flux<String>, Flux<String>> toUpperCase() {
    return flux -> flux.map(value -> value.toUpperCase());
}
Run Code Online (Sandbox Code Playgroud)

Spring Cloud Stream 和 Spring Cloud Function 将负责订阅、绑定等。这里有更多关于Spring Cloud Stream 中函数式编程模型的信息。