用于阻塞I/O任务的ParallelFlux与flatMap()

Cor*_*her 13 project-reactor reactive-streams

我有一个Project Reactor链,它包含一个阻塞任务(网络调用,我们需要等待响应).我想同时运行多个阻塞任务.

似乎可以使用ParallelFlux或flatMap(),裸骨示例:

Flux.just(1)
    .repeat(10)
    .parallel(3)
    .runOn(Schedulers.elastic())
    .doOnNext(i -> blockingTask())
    .sequential()
    .subscribe()
Run Code Online (Sandbox Code Playgroud)

要么

Flux.just(1)
    .repeat(10)
    .flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?

Sim*_*slé 13

parallel为性能目的而定制的任务并行化,以及"rails"或"groups"之间的工作调度,每个都从Scheduler你传递给他们自己的执行上下文runOn.简而言之,如果您进行CPU密集型工作,它将使您的所有CPU内核都能正常工作.但是你正在进行I/O约束工作......

所以在你的情况下,flatMap是一个更好的候选人.这使用的flatMap并行化更多的是编排.

这几乎是两种选择,如果你不计算flatMap那种稍微不同的味道flatMapSequential(concatMap实际上并不允许并行化).

  • 同意,通过_blocking_示例,很难看出区别。但如果“flatMap”使用的源是非阻塞的(I/O 是转换为非阻塞实现的主要候选者),那么“flatMap”可以真正在那里大放异彩。该操作符将充当事件循环,在准备就绪时从 IO 发布者处获取通知,并确保所有这些元素都串行发布到下游 (4认同)
  • 我看不出有什么区别。使用flatMap,您仍然必须订阅不同的“线程” (3认同)