有没有办法使用 Reactor 异步读取 InputStream 或转换为字节?

Dan*_*mez 5 java spring asynchronous project-reactor

我正在尝试将文件上传到S3,但是JVM说我在代码片段中有一个线程阻塞方法调用,其中调用file.readAllBytes()时线程不应该被阻塞,所以有没有办法使方法与 Flux 或 Mono 异步?或任何其他方法来解决这个问题?

private Mono<Boolean> uploadFile(InputStream file, String bucket, String name) {
        try {
            return uploadAdapter.uploadObject(bucket,name,file.readAllBytes());
        } catch (IOException e) {
            return Mono.just(false);
        }
    }
@Override
    public Mono<Boolean> uploadObject(String bucketName, String objectKey, byte[] fileContent) {
        return Mono.fromFuture(
                        s3AsyncClient.putObject(configurePutObject(bucketName, objectKey),
                                AsyncRequestBody.fromBytes(fileContent)))
                .map(response -> response.sdkHttpResponse().isSuccessful());
    }
Run Code Online (Sandbox Code Playgroud)

use*_*211 2

由于InputStream是同步 API,因此您有 2 个选择,对于任何其他同步 API 也是如此:

  1. 切换到另一个 API。对于许多问题来说,这可能是一个很好且可行的解决方案。反应式概念和异步通常非常常见,对于大多数需求,有一个替代的异步库可以做同样的事情。对于您的情况,您可以使用Reactor-Netty 库java.nio2中的 , 或 函数,该库针对此用例提供了很好的解决方案。
  2. 使用另一个调度程序。项目反应器建议所有异步调用都将在一个非阻塞调度程序上运行,对于同步调用,请使用另一个(阻塞)每个请求线程调度程序。您可以使用两个这样的调度程序:single()boundedElastic()。不同之处在于,它boundedElastic()限制了您可以打开的线程数量,因此您最终将使用线程作为阻塞队列,这比single().