Reactor 项目中的 Parallel Flux 与 Flux

Dee*_*and 8 java spring-boot project-reactor spring-webflux

所以我从文档中了解到的是,并行通量本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。让我们考虑这样的情况。所有这些都将在通过 runOn() 方法提供的同一调度程序实例上运行。让我们考虑如下情况:

Mono<Response> = webClientCallAPi(..) //function returning Mono from webclient call
Run Code Online (Sandbox Code Playgroud)

现在假设我们拨打了大约 100 个电话

Flux.range(0,100).subscribeOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).collecttoList() // or subscribe somehow
Run Code Online (Sandbox Code Playgroud)

如果我们使用 paralleFlux:

Flux.range(0,100).parallel().runOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).sequential().collecttoList();
Run Code Online (Sandbox Code Playgroud)

所以如果我的理解是正确的,那么它看起来非常相似。那么 ParallelFlux 相对于 Flux 有哪些优势以及什么时候应该使用 parallelFlux 相对于 Flux 呢?

Mic*_*rry 19

在实践中,您可能很少需要使用并行通量,包括在本例中。

在您的示例中,您将发出 100 个 Web 服务调用。请记住,执行此操作所需的实际工作量非常低 - 您生成并发出异步请求,然后一段时间后您会收到响应。在请求和响应之间,您根本不做任何工作,只是在发送每个请求时占用少量的 CPU 资源,而在收到每个响应时又占用少量的 CPU 资源。(这是使用异步框架发出 Web 请求的核心优势之一,在请求进行时您不会占用任何线程。)

如果您分割此通量并并行运行它,则表示您希望分割这些少量的 CPU 资源,以便它们可以在不同的 CPU 内核上同时运行。这绝对没有意义——分割通量、并行运行然后稍后组合它的开销将比仅仅让它在正常的顺序调度器上执行要大得多。

另一方面,假设我有一个Flux<Integer>,并且我想检查每个整数是否是质数,或者可能是Flux<String>我想根据 BCrypt 哈希检查的密码。这些类型的操作确实是CPU 密集型的,因此在这种情况下,用于跨内核分割执行的并行通量可能会很有意义。但实际上,在正常的反应堆用例中,这些情况很少发生。

(另外,作为结束语,您几乎总是希望使用Schedulers.parallel()并行通量,而不是Schedulers.boundedElastic()。)

  • subscribeOn 和publishOn 使用来自指定发布者的单个线程,它不会并发 (2认同)
  • @DeekshithAnand“如果你做了类似 Flux.range(1,100).publishOn(Schedulers.parallel()) 的事情,它只是并行地分配事物,对吗?” - 一点都不。这永远不能并行执行 Flux 任务,因为底层的非并行 Flux 一次只能提供一个任务。您看到 Web 请求同时执行的事实并不是并行性的证明(而是并发性的证明。) (2认同)
  • 啊..我发现并发和并行之间存在细微的差别...这很清楚。蒂! (2认同)