使用Project Reactor中的ExchangeFunction从ClientRequest下载文件并保存文件

K2m*_*J33 3 reactive-programming spring-boot project-reactor spring-webflux

在Project Reactor中完成文件下载后,我无法正确保存文件。

class HttpImageClientDownloader implements ImageClientDownloader {

    private final ExchangeFunction exchangeFunction;

    HttpImageClientDownloader() {
        this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
    }

    @Override
    public Mono<File> downloadImage(String url, Path destination) {

        ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
        return exchangeFunction.exchange(clientRequest)
                .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                //.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                .flatMap(dataBuffer -> {

                    AsynchronousFileChannel fileChannel = createFile(destination);
                    return DataBufferUtils
                            .write(dataBuffer, fileChannel, 0)
                            .publishOn(Schedulers.elastic())
                            .doOnNext(DataBufferUtils::release)
                            .then(Mono.just(destination.toFile()));


                });

    }

    private AsynchronousFileChannel createFile(Path path) {
        try {
            return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
        } catch (Exception e) {
            throw new ImageDownloadException("Error while creating file: " + path, e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

所以我的问题是:DataBufferUtils.write(dataBuffer,fileChannel,0)是否阻塞?

磁盘变慢怎么办?

第二个问题是发生ImageDownloadException时会发生什么,在doOnNext中,我想释放给定的数据缓冲区,这是进行此类操作的好地方吗?

我认为这行:

            .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
Run Code Online (Sandbox Code Playgroud)

可能会阻塞...

Bri*_*zel 5

这是另一种(较短的)实现方法:

Flux<DataBuffer> data = this.webClient.get()
        .uri("/greeting")
        .retrieve()
        .bodyToFlux(DataBuffer.class);

Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
        .map(DataBufferUtils::release)
        .then(Mono.just(file));
Run Code Online (Sandbox Code Playgroud)

现在,DataBufferUtils::write操作不会阻塞,因为它们将非阻塞IO与通道一起使用。写入这样的通道意味着它将把所有可能的内容写入输出缓冲区(即可能写入全部DataBuffer或部分内容)。

使用Flux::map或是执行此Flux::doOnNext操作的正确位置。但是您是对的,如果发生错误,您仍然有责任释放当前缓冲区(以及所有剩余的缓冲区)。在Spring Framework中,我们可能需要改进一些地方,请注意SPR-16782

我看不到您的上一个示例如何显示任何阻塞:所有方法都返回响应类型,没有一个在阻塞I / O。