为什么此 Flux 的处理会无限期地挂在大小 256 上?

Dmi*_*huk 0 java concurrency group-by project-reactor

我需要处理来自Flux组内 (by id) 的事件,以便在单个组内按顺序处理每个事件,但并行处理组。据我所知,这可以通过groupBy和来实现concatMap。当我实现这个时,我的测试开始无限期地挂在一些大量的唯一 ID 上。我将问题与下面的代码隔离开来,并找到了代码开始挂起的特定数字 - 256。我绝对不明白为什么会发生这种情况以及从何256而来。

这是挂起的代码:

@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
  var scheduler = Schedulers
      .newBoundedElastic(
          1000,
          1000,
          "really-big-scheduler"
      );
  Flux.range(0, uniqueStringsCount)
      .map(Object::toString)
      .repeat()
      // this represents "a lot of events"
      .take(50_000)
      .groupBy(x -> x)
      // this gets the same results
      // .parallel(400)
      .parallel()
      .flatMap(group ->
          group.concatMap(e ->

              // this represents a processing operation on each event
              Mono.fromRunnable(() -> {
                    try {
                      Thread.sleep(0);
                    } catch (InterruptedException ex) {
                      throw new RuntimeException(ex);
                    }
                  })

              // this also doesn't work
              // Mono.delay(Duration.ofMillis(0))
              // Mono.empty()

          // big scheduler doesn't help either
          // ).subscribeOn(scheduler)
          )
      // ).runOn(scheduler)
      ).runOn(Schedulers.parallel())
      .then()
      .block();
}
Run Code Online (Sandbox Code Playgroud)

我们首先构造一个Flux有很多(50k,只是一个例子)的Stringa。但其中只有一定数量的唯一字符串Flux,这些字符串被分成并行处理的一定数量的组。但每组内的事件都是通过 顺序处理的concatMap。并且此代码仅挂在256唯一的字符串上。

最初,我认为某个地方的某个线程池已耗尽,因此我添加了一个really-big-scheduler来测试它 - 但它只会执行速度较慢并且还会挂起256。然后我尝试删除阻塞Thread.sleep(我从这个开始,因为我的实际实现可能是阻塞的) - 但它也挂起256 此外,更改parallelism400在上面的代码中)不会改变任何东西。

小智 5

Flux.groupBy在处理大量组时需要格外小心,如其javadoc 中所述:

请注意,groupBy 在组基数较低的情况下效果最好,因此请相应地选择 keyMapper 函数。

这些组需要在下游被排出和消耗,groupBy 才能正常工作。值得注意的是,当标准生成大量组时,如果下游没有适当地使用这些组(例如,由于 maxConcurrency 参数设置得太低的 flatMap),可能会导致挂起。

这里预取量设置得太低:默认情况下它设置为Queues.SMALL_BUFFER_SIZE,这是默认的256(可以使用属性 进行更改reactor.bufferSize.small)。Flux.groupBy 有一种手动设置预取量的方法:Flux.groupBy(Function, int),因此我建议将您的运算符替换为.groupBy(x -> x, 1024)或其他合适的高量。

预取量很重要,因为它是它可以处理的未完成项目的数量。在您的情况下,首先Scheduler.createWorker()进行 255 个调用,每个项目都放在一个 Worker 上,然后将其和创建的 GroupedFlux 放入 groupBy 的内部队列中等待 Worker 完成。当第 256 个项目在任何 Worker 完成之前出现时,它无法将其放入队列中,并挂起。