K2m*_*J33 3 reactive-programming spring-boot project-reactor spring-webflux
在Project Reactor中完成文件下载后,我无法正确保存文件。
class HttpImageClientDownloader implements ImageClientDownloader {
private final ExchangeFunction exchangeFunction;
HttpImageClientDownloader() {
this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
}
@Override
public Mono<File> downloadImage(String url, Path destination) {
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
return exchangeFunction.exchange(clientRequest)
.map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
//.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
.flatMap(dataBuffer -> {
AsynchronousFileChannel fileChannel = createFile(destination);
return DataBufferUtils
.write(dataBuffer, fileChannel, 0)
.publishOn(Schedulers.elastic())
.doOnNext(DataBufferUtils::release)
.then(Mono.just(destination.toFile()));
});
}
private AsynchronousFileChannel createFile(Path path) {
try {
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
} catch (Exception e) {
throw new ImageDownloadException("Error while creating file: " + path, e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
所以我的问题是:DataBufferUtils.write(dataBuffer,fileChannel,0)是否阻塞?
磁盘变慢怎么办?
第二个问题是发生ImageDownloadException时会发生什么,在doOnNext中,我想释放给定的数据缓冲区,这是进行此类操作的好地方吗?
我认为这行:
Run Code Online (Sandbox Code Playgroud).map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
可能会阻塞...
这是另一种(较短的)实现方法:
Flux<DataBuffer> data = this.webClient.get()
.uri("/greeting")
.retrieve()
.bodyToFlux(DataBuffer.class);
Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
.map(DataBufferUtils::release)
.then(Mono.just(file));
Run Code Online (Sandbox Code Playgroud)
现在,DataBufferUtils::write操作不会阻塞,因为它们将非阻塞IO与通道一起使用。写入这样的通道意味着它将把所有可能的内容写入输出缓冲区(即可能写入全部DataBuffer或部分内容)。
使用Flux::map或是执行此Flux::doOnNext操作的正确位置。但是您是对的,如果发生错误,您仍然有责任释放当前缓冲区(以及所有剩余的缓冲区)。在Spring Framework中,我们可能需要改进一些地方,请注意SPR-16782。
我看不到您的上一个示例如何显示任何阻塞:所有方法都返回响应类型,没有一个在阻塞I / O。
| 归档时间: |
|
| 查看次数: |
1269 次 |
| 最近记录: |