如何将 Reactor Flux<String> 转换为 InputStream

Art*_*lin 5 kotlin project-reactor reactive-streams spring-webflux

鉴于我Flux<String>的大小未知,如何将其转换为InputStream其他库所期望的?

例如使用 WebClient 我可以使用这种方法实现

WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }

但是当我Flux<String>作为输入时,我不知道如何做同样的事情?

Edw*_*rzo 7

可能有很多方法可以做到这一点。一种可能性是使用PipedInputStreamPipedOutputStream

它的工作方式是将输出流链接到输入流,这样您写入输出流的所有内容都可以从链接的输入流中读取,通过这样做,在它们两者之间创建一个管道。

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
Run Code Online (Sandbox Code Playgroud)

但是,有一个警告,根据管道流的文档,写入过程和读取过程必须发生在不同的线程上,否则我们可能会导致死锁。

因此,回到我们的反应流场景,我们可以创建一个管道(如上所述)并订阅Flux对象和从中获得的数据,然后将其写入管道输出流。无论你在那里写什么,都可以在管道的另一端,在相应的输入流中读取。此输入流是您可以与非反应式方法共享的输入流。

我们只需要格外小心,我们就可以在单独的线程上订阅 Flux,例如subscribeOn(Schedulers.elastic()).

这是此类订阅者的一个非常基本的实现:

class PipedStreamSubscriber extends BaseSubscriber<byte[]> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final PipedInputStream in;
    private PipedOutputStream out;

    PipedStreamSubscriber(PipedInputStream in) {
        Objects.requireNonNull(in, "The input stream must not be null");
        this.in = in;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        //change if you want to control back-pressure
        super.hookOnSubscribe(subscription);
        try {
            this.out = new PipedOutputStream(in);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnNext(byte[] payload) {
        try {
            out.write(payload);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnComplete() {
        close();
    }

    @Override
    protected void hookOnError(Throwable error) {
        //TODO handle the error or at least log it
        logger.error("Failure processing stream", error);
        close();
    }

    @Override
    protected void hookOnCancel() {
        close();
    }

    private void close() {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException e) {
            //probably just ignore this one or simply  log it
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

使用这个订阅者,我可以定义一个非常简单的实用方法,将 aFlux<byte[]变成 an InputStream,如下所示:

static InputStream createInputStream(Flux<byte[]> flux) {

    PipedInputStream in = new PipedInputStream();
    flux.subscribeOn(Schedulers.elastic())
        .subscribe(new PipedStreamSubscriber(in));

    return in;
}
Run Code Online (Sandbox Code Playgroud)

请注意,当流程完成、发生错误或取消订阅时,我特别小心地关闭输出流,否则我们将面临在读取端阻塞的风险,等待更多输入到达。关闭输出流是管道另一侧输入流结束的信号。

现在 InputStream 可以像任何常规流一样被使用,因此您可以将它传递给您的非反应性方法,例如

Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);

try (InputStream in = createInputStream(jedi)) {
    byte[] data = new byte[5];
    int size = 0;
    while ((size = in.read(data)) > 0) {
        System.out.printf("%s", new String(data, 0, size));
    }
} 
Run Code Online (Sandbox Code Playgroud)

上面的代码产生:

Luke
Obi-Wan
Yoda
Run Code Online (Sandbox Code Playgroud)

  • 您能否添加一些有关内存消耗和延迟的信息?例如,如果我要在 100MB 的文件上使用此解决方案,该文件是否会完全加载到内存中?或者你的“System.out.printf”会在通量开始时立即开始输出吗?谢谢! (2认同)