如何在Webflux应用程序中使Spring Cloud Stream消费者?

Mir*_*lav 6 reactive-programming apache-kafka project-reactor spring-cloud-stream spring-webflux

我有一个基于 Webflux 的微服务,它有一个简单的反应存储库:

    public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
    }
Run Code Online (Sandbox Code Playgroud)

现在我想扩展这个微服务来使用来自 Kafka 的事件消息。然后该消息/事件将被保存到数据库中。

对于 Kafka 监听器,我使用了 Spring Cloud Stream。我创建了一些简单的消费者,它工作得很好 - 我能够使用消息并将其保存到数据库中。

    @Bean
    public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
        return input ->
                input.foreach((key, value) -> {
                    LOG.info("Received event, Key: {}, value: {}", key, value);
                    repository.save(initNotification(value)).subscribe();
                });
    }
Run Code Online (Sandbox Code Playgroud)

但这是连接 Spring Cloud Stream 消费者和反应式存储库的正确方法吗?看起来不像是我subscribe()最后必须打电话的时候。

我阅读了Spring Cloud Stream 文档(针对 3.0.0 版本),他们说

Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.
Run Code Online (Sandbox Code Playgroud)

在这个演示视频中,他们还提到他们使用项目反应器提供反应式编程支持。所以我想有一种我不知道的方法。你能告诉我如何正确地做吗?

如果这一切听起来太愚蠢,我深表歉意,但我对 Spring Cloud Stream 和反应式编程非常陌生,并且没有找到很多描述这一点的文章。

zla*_*val 5

只需使用 Flux 作为消耗类型,如下所示:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
    return input ->
            input
             .map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
             .concatMap((value) -> repository.save(initNotification(value)))
             .subscribe();
}
Run Code Online (Sandbox Code Playgroud)

如果您使用Function空返回类型(Function<Flux<Message<Event>>, Mono<Void>>)而不是 Consumer,那么框架可以自动订阅。您必须Consumer手动订阅,因为框架没有对流的引用。但Consumer如果您订阅的不是存储库而是整个流,那就没问题了。