Spring 集成 WebFlux 错误处理

Abh*_*kar 5 spring spring-integration reactive-programming spring-webflux

SI 5+ 支持WebFlux,这意味着我们现在可以构建一个反应式消息系统。然而,这也意味着设计已经经过深思熟虑,通常的错误处理方法不起作用。在反应式流中,消息是Publisher( Flux),它不会抛出异常,但会发出错误通知。因此,消息上设置的错误通道标头是无用的,因为 SI 不知道导致Flux了错误。考虑以下代码:

.handle(WebFlux.outboundGateway(m -> m.getPayload().toString(), webClient)
    .expectedResponseType(YelpRecord.class)
    .httpMethod(GET)
    .mappedRequestHeaders(ACCEPT)
    .replyPayloadToFlux(true))
.handle((GenericHandler<Flux<YelpRecord>>) (flux, headers) ->
    flux
            .doOnError(t -> log.error(t.getMessage(), t))
            .doAfterTerminate(() ->
                    log.info("Completed streaming from: {}.", headers.get(DOWNLOAD_URI_HEADER))
            )
            .onBackpressureBuffer(
                    yelpArtifactoryProperties.getOnBackpressureBufferSize(),
                    BufferOverflowStrategy.ERROR)
)
Run Code Online (Sandbox Code Playgroud)

上面的代码片段中缺少的是将异常发送到来自 的消息上配置的错误通道doOnError。我们怎样才能做到这一点?

Art*_*lan 2

((MessageChannel) header.getErrorChannel()).send(...)那里适合你吗.doOnError()

关键是你是对的,Flux消息有效负载中的这一点已经不受框架的控制,如果你想处理它的错误,你必须自己做。这已经是您的代码,doOnError()因此框架无法帮助您完成一些自动的事情。