block()/ blockFirst()/ blockLast()在调用bodyToMono后阻塞错误AFTER exchange()

Dai*_*hiG 3 java reactive-programming project-reactor spring-webflux

我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api会返回成功,但是DTO会在生成文件而不是文件本身时详细说明错误.这是使用一个非常古老和设计不佳的api所以请原谅使用post和api设计.

来自api调用(exchange())的响应是ClientResponse.从这里我可以使用bodyToMono转换为ByteArrayResource,它可以流式传输到文件,或者,如果创建文件时出错,那么我也可以使用bodyToMono转换为DTO.但是,我似乎无法做任何事情或取决于ClientResponse标题的内容.

在运行时我得到一个IllegalStateException引起的

block()/ blockFirst()/ blockLast()是阻塞,在线程reactor-http-client-epoll-12中不支持

我认为我的问题是我不能在同一个函数链中两次调用block().

我的代码片段是这样的:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();
Run Code Online (Sandbox Code Playgroud)

基本上我想根据标头中定义的MediaType以不同方式处理ClientResponse.

这可能吗?

ade*_*nor 13

正如投票最多的答案所述,永远不应该阻止。就我而言,这是唯一的选择,因为我们在命令式代码中使用反应式库。阻塞可以通过将单声道包装在处理器中来完成:

myMono.toProcessor().block()
Run Code Online (Sandbox Code Playgroud)

  • `myMono.toFuture().get();` 永远持续下去,应用程序永远不会执行下一行代码。我在帖子中尝试过,获得了多个工作网址。任何想法? (6认同)
  • 这是一个已弃用的方法。文档要求我们使用共享 (5认同)

Bri*_*zel 11

首先,一些有助于您理解解决此用例的代码段的事情.

  1. 你永远不应该在返回一个被动类型的方法中调用阻塞方法; 你将阻止应用程序的少数线程之一,这对应用程序来说非常糟糕
  2. 无论如何,从Reactor 3.2开始,在反应式管道中阻塞会引发错误
  3. subscribe如评论中所述,召唤也不是一个好主意.它或多或少像在单独的线程中将该作业作为任务启动.完成后你会得到一个回调(subscribe方法可以给lambda),但实际上你正在将当前管道与该任务分离.在这种情况下,在您有机会读取完整的响应主体以将其写入文件之前,可以关闭客户端HTTP响应并清理资源
  4. 如果你不想在内存中缓冲整个响应,Spring会提供DataBuffer(想想可以合并的ByteBuffer实例).
  5. 如果您正在实现的方法本身阻塞(void例如返回),则可以调用block ,例如在测试用例中.

这是您可以用来执行此操作的代码段:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,我们并没有在任何地方阻塞,并且处理I/O的方法正在返回Mono<Void>,这是一个done(error)回调的反应,它反映了事情何时完成以及是否发生了错误.

由于我不确定该createErrorFile方法应该做什么,我提供了一个示例createSpreadsheet,只是将正文字节写入文件.请注意,由于数据库可能被回收/合并,我们需要在完成后释放它们.

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}
Run Code Online (Sandbox Code Playgroud)

通过这种实现,您的应用程序将DataBuffer在给定时间在内存中保留一些实例(由于性能原因,反应式运算符正在预取值),并且会以反应方式写入字节.


Ant*_*kin 7

要在服务器请求池之外执行客户端请求,请使用 myWebClientMono.share().block();

  • share() 到底做了什么? (5认同)
  • 它将在当前工作池之外执行阻塞调用。 (3认同)