Art*_*lin 5 kotlin project-reactor reactive-streams spring-webflux
鉴于我Flux<String>的大小未知,如何将其转换为InputStream其他库所期望的?
例如使用 WebClient 我可以使用这种方法实现
WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }
但是当我Flux<String>作为输入时,我不知道如何做同样的事情?
可能有很多方法可以做到这一点。一种可能性是使用PipedInputStream和PipedOutputStream。
它的工作方式是将输出流链接到输入流,这样您写入输出流的所有内容都可以从链接的输入流中读取,通过这样做,在它们两者之间创建一个管道。
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
Run Code Online (Sandbox Code Playgroud)
但是,有一个警告,根据管道流的文档,写入过程和读取过程必须发生在不同的线程上,否则我们可能会导致死锁。
因此,回到我们的反应流场景,我们可以创建一个管道(如上所述)并订阅Flux对象和从中获得的数据,然后将其写入管道输出流。无论你在那里写什么,都可以在管道的另一端,在相应的输入流中读取。此输入流是您可以与非反应式方法共享的输入流。
我们只需要格外小心,我们就可以在单独的线程上订阅 Flux,例如subscribeOn(Schedulers.elastic()).
这是此类订阅者的一个非常基本的实现:
class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PipedInputStream in;
private PipedOutputStream out;
PipedStreamSubscriber(PipedInputStream in) {
Objects.requireNonNull(in, "The input stream must not be null");
this.in = in;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
//change if you want to control back-pressure
super.hookOnSubscribe(subscription);
try {
this.out = new PipedOutputStream(in);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnNext(byte[] payload) {
try {
out.write(payload);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnComplete() {
close();
}
@Override
protected void hookOnError(Throwable error) {
//TODO handle the error or at least log it
logger.error("Failure processing stream", error);
close();
}
@Override
protected void hookOnCancel() {
close();
}
private void close() {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
//probably just ignore this one or simply log it
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用这个订阅者,我可以定义一个非常简单的实用方法,将 aFlux<byte[]变成 an InputStream,如下所示:
static InputStream createInputStream(Flux<byte[]> flux) {
PipedInputStream in = new PipedInputStream();
flux.subscribeOn(Schedulers.elastic())
.subscribe(new PipedStreamSubscriber(in));
return in;
}
Run Code Online (Sandbox Code Playgroud)
请注意,当流程完成、发生错误或取消订阅时,我特别小心地关闭输出流,否则我们将面临在读取端阻塞的风险,等待更多输入到达。关闭输出流是管道另一侧输入流结束的信号。
现在 InputStream 可以像任何常规流一样被使用,因此您可以将它传递给您的非反应性方法,例如
Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);
try (InputStream in = createInputStream(jedi)) {
byte[] data = new byte[5];
int size = 0;
while ((size = in.read(data)) > 0) {
System.out.printf("%s", new String(data, 0, size));
}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码产生:
Luke
Obi-Wan
Yoda
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6513 次 |
| 最近记录: |