小编Dmi*_*huk的帖子

为什么此 Flux 的处理会无限期地挂在大小 256 上?

我需要处理来自Flux组内 (by id) 的事件,以便在单个组内按顺序处理每个事件,但并行处理组。据我所知,这可以通过groupBy和来实现concatMap。当我实现这个时,我的测试开始无限期地挂在一些大量的唯一 ID 上。我将问题与下面的代码隔离开来,并找到了代码开始挂起的特定数字 - 256。我绝对不明白为什么会发生这种情况以及从何256而来。

这是挂起的代码:

@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
  var scheduler = Schedulers
      .newBoundedElastic(
          1000,
          1000,
          "really-big-scheduler"
      );
  Flux.range(0, uniqueStringsCount)
      .map(Object::toString)
      .repeat()
      // this represents "a lot of events"
      .take(50_000)
      .groupBy(x -> x)
      // this gets the same results
      // .parallel(400)
      .parallel()
      .flatMap(group ->
          group.concatMap(e ->

              // this represents a processing operation on each event
              Mono.fromRunnable(() …
Run Code Online (Sandbox Code Playgroud)

java concurrency group-by project-reactor

0
推荐指数
1
解决办法
804
查看次数

标签 统计

concurrency ×1

group-by ×1

java ×1

project-reactor ×1