boundedElastic() 与 parallel() 调度程序之间的区别

MNK*_*MNK 5 spring scheduler reactive-programming

我是 Project reactor 的新手,我正在尝试了解 boundedElastic() 与 parallel() 调度程序之间的区别。文档(https://projectreactor.io/docs/core/release/api/)说 boundedElastic() 用于阻塞任务,parallel() 用于非阻塞任务。为什么 Project reactor 需要解决阻塞场景,因为它们本质上是非阻塞的。有人可以帮我解决一些有关 boundedElastic() 与 parallel() 调度程序的实时用例吗?

Ale*_*lex 11

长话短说

Reactor 在少量线程上执行非阻塞/异步任务。如果任务阻塞 - 线程将被阻塞并且所有其他任务将等待它。

  • parallel应用于快速非阻塞操作(默认选项)
  • boundedElastic应该用于“卸载”阻塞任务

一般来说,Reactor API 与并发无关,使用Schedulers抽象来执行任务。Schedulers职责与 非常相似ExecutorService

Schedulers.parallel()

应该是默认选项,用于少量线程上的快速非阻塞操作。默认情况下,线程数等于 CPU 核心数。它可以通过系统属性来控制reactor.schedulers.defaultPoolSize

Schedulers.boundedElastic()

用于执行较长的操作(阻塞任务)作为反应流的一部分。它将使用线程池,默认线程数为 CPU 核心数 x 10(可以通过 控制reactor.schedulers.defaultBoundedElasticSize),每个线程的默认队列大小为 100000 ( reactor.schedulers.defaultBoundedElasticSize)。

subscribeOn或者publishOn可以用来改变调度程序。

下面的代码展示了如何包装阻塞操作

Mono.fromCallable(() -> {
    // blocking operation
}).subscribeOn(Schedulers.boundedElastic()); // run on a separate scheduler because code is blocking
Run Code Online (Sandbox Code Playgroud)

Schedulers.newBoundedElastic() 与此类似,Schedulers.boundedElastic()但当您需要为某些操作创建单独的线程池时很有用。

有时,哪些代码被阻塞并不明显。BlockHound是测试反应式代码时非常有用的工具


小智 4

并行风格由 N 个工作线程(根据 N 个 cpu)支持,每个工作线程都基于 ScheduledExecutorService。如果您向其提交 N 个长期任务,则无法执行更多工作,因此对短期任务具有亲和力。

弹性风味还得到基于 ScheduledExecutorService 的工作人员的支持,只不过它按需创建这些工作人员并将它们汇集起来。BoundedElastic 和elastic 一样,不同的是你可以限制总数。线程。

https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers