Flux.subscribe(Consumer<? super T> Consumer>) 和 Flux.doOnNext(Consumer<? super T> onNext) 之间的区别

Mik*_*ies 5 java project-reactor spring-webflux

刚刚开始了解 Reactor 的反应式编程,我从这里的教程Building-a-chat-application-with-angular-and-spring-reactive-websocket 中看到了这个代码片段

class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
    val sink = Sinks.replay<Message>(100);
    val outputMessages: Flux<Message> = sink.asFlux();
    override fun handle(session: WebSocketSession): Mono<Void> {
        println("handling WebSocketSession...")
        session.receive()
                .map { it.payloadAsText }
                .map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
                .doOnNext { println(it) }
                .subscribe(
                        { message: Message -> sink.next(message) },
                        { error: Throwable -> sink.error(error) }
                );
        return session.send(
                Mono.delay(Duration.ofMillis(100))
                        .thenMany(outputMessages.map { session.textMessage(toJson(it)) })
        )
    }
    fun toJson(message: Message): String = mapper.writeValueAsString(message)
}
Run Code Online (Sandbox Code Playgroud)

我理解它的作用,但不明白为什么作者在 subscribe 方法中使用消费者而不是链接另一个 doOnNext(consumer)。IE。行:

                .doOnNext { println(it) }
                .subscribe(
                        { message: Message -> sink.next(message) },
                        { error: Throwable -> sink.error(error) }
Run Code Online (Sandbox Code Playgroud)

从 Reactor 文档中我读到了 Flux.subscribe(Consumer <? super T> Consumer):

向该 Flux 订阅一个 Consumer,它将消耗序列中的所有元素。它将请求无限的需求(Long.MAX_VALUE)。

对于观察和转发传入数据的被动版本,请参阅 doOnNext(java.util.function.Consumer)。

然而,我不明白为什么人们会选择其中一个而不是另一个,对我来说,它们在功能上似乎是相同的。

Mic*_*rry 10

区别在于传统性而非功能性——区别在于副作用与最终消费者。

这一doOnXXX系列方法旨在应对反应链执行时用户设计的副作用 - 日志记录是其中最常见的方法,但您可能还需要在每个元素经过时查看指标、分析等。所有这些的关键是,将其中任何一个作为最终消费者都没有多大意义(例如println()上面示例中的)。

相反,subscribe()消费者应该是“最终消费者”,通常由框架(例如 Webflux)而不是用户代码调用 - 因此这种情况是该规则的一个例外。在这种情况下,他们主动将此反应链中的消息传递到另一个接收器以进行进一步处理 - 因此将其作为“副作用”样式方法没有多大意义,因为您不希望 Flux继续超越这一点。

(附录:如上所述,使用reactor / Webflux的正常方法是让Webflux处理订阅,这不是这里发生的情况。我还没有详细查看是否有更明智的方法来实现这一点而无需用户订阅,但根据我的经验,通常是这样,并且手动调用订阅通常会产生一些代码味道。您当然应该尽可能在自己的代码中避免它。)