如何在Spring Webflux/Reactor Netty Web应用程序中执行阻塞调用

Den*_*sen 6 spring-data-mongodb project-reactor reactor-netty spring-webflux

在我的用例中,我有一个带有Reactor Netty的Spring Webflux微服务,我有以下依赖项:

  • org.springframework.boot.spring-boot-starter-webflux (2.0.1.RELEASE)
  • org.springframework.boot.spring-boot-starter-data-mongodb-reactive (2.0.1.RELEASE)
  • org.projectreactor.reactor-spring (1.0.1.RELEASE)

对于一个非常具体的案例,我需要从Mongo数据库中检索一些信息,并将其处理成与我的被动发送的查询参数WebClient.由于WebClient并且UriComponentsBuilder接受发布者(Mono/Flux)我使用了一个#block()调用来接收结果.

由于reactor-core(版本0.7.6.RELEASE)已包含在最新spring-boot-dependencies版本(版本2.0.1.RELEASE)中,因此无法再使用:block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx,请参阅 - > https://github.com/reactor/reactor-netty/问题/ 312

我的代码片段:

public Mono<FooBar> getFooBar(Foo foo) {
    MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
    parameters.add("size", foo.getSize());
    parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
        .map(Bar::toString)
        .collectList()
        .block());

    String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
        .port(8081)
        .path("/foo-bar")
        .queryParams(parameters)
        .build()
        .toString();

    return webClient.get()
        .uri(url)
        .retrieve()
        .bodyToMono(FooBar.class);
}
Run Code Online (Sandbox Code Playgroud)

这适用于spring-boot版本2.0.0.RELEASE,但自从升级到版本2.0.1.RELEASE并因此升级reactor-core到版本0.7.6.RELEASE后不再允许.

我看到的唯一真正的解决方案是包括一个块(非反应性)存储库/ mongo客户端,但我不确定是否鼓励这样做.有什么建议?

Bri*_*zel 6

WebClient不接受Publisher其请求的URL类型,但没有什么能阻止你做以下:

public Mono<FooBar> getFooBar(Foo foo) {

    Mono<List<String>> bars = barReactiveCrudRepository
        .findAllByIdentifierIn(foo.getBarIdentifiers())
        .map(Bar::toString)
        .collectList();

    Mono<FooBar> foobar = bars.flatMap(b -> {

        MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
        parameters.add("size", foo.getSize());
        parameters.addAll("bars", b);

        String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
            .port(8081)
            .path("/foo-bar")
            .queryParams(parameters)
            .build()
            .toString();

        return webClient.get()
            .uri(url)
            .retrieve()
            .bodyToMono(FooBar.class);
    });
    return foobar;         
}
Run Code Online (Sandbox Code Playgroud)

如果有的话,这个新的反应器核心检查使您免于在 WebFlux 处理程序中间使用此阻塞调用使整个应用程序崩溃。