我有一个旧库,必须使用它来检索文件。这个遗留库不会像您通常期望的那样在 InputStream 中返回读取内容,但它期望它传递一个开放的 OutputStream,以便它可以写入。
我必须编写一个 Webflux REST 服务,将该 OutputStream 写入 org.springframework.web.reactive.function.server.ServerResponse 主体。
legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse
Run Code Online (Sandbox Code Playgroud)
由于我想将 Stream 直接传递给 ServerResponse,我可能必须做这样的事情,对吧?
ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);
Run Code Online (Sandbox Code Playgroud)
这是 RequestHandler 中重要的部分。我遗漏了一些通常不需要的错误处理/异常捕获。请注意,我对读取publishedOn有不同Scheduler(或者至少,这就是我想要做的),因此这种阻塞读取不会干扰我的主事件线程:
private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
final long blobSize = tag.getBlobSize();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
// for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
for(int i = 0; i < blobSize; i+= tagChunkSize) {
// new DataBuffer that is written to, then emitted later
DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
try (OutputStream outputStream = dataBuffer.asOutputStream()) {
// write to the outputstream of DataBuffer
tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
// don't know if flushing is strictly neccessary
outputStream.flush();
} catch (IOException | FPLibraryException e) {
log.error("Error reading + writing from tag to http outputstream", e);
emitter.error(e);
}
emitter.next(dataBuffer);
}
// if blob is finished, send "complete" to my flux of DataBuffers
emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7857 次 |
| 最近记录: |