GroupedFlux<T> 创建的 groupBy 运算符的数量是否有限

exc*_*bot 4 java project-reactor

根据文档参考, groupBy 运算符根据运算符的键盘映射器功能将给定的 Flux 拆分为多个 GroupedFlux。如果我使用 257 个整数范围执行以下代码,它可以正常工作,但不能使用 258 个整数

    public void groupByTest() {
    Flux.range(1, 258)
            .groupBy(val -> val)
            .concatMap(g -> g.map(val -> val + "test"))
            .doOnNext(System.out::println)
            .blockLast();
}
Run Code Online (Sandbox Code Playgroud)

这是否意味着 groupBy 运算符不能创建超过 257 个组?

Sim*_*slé 7

正如 javadoc 中所述groupBy

这些组需要在下游被排出和消耗,groupBy 才能正常工作。值得注意的是,当标准生成大量组时,如果下游没有适当地消耗这些组(例如,由于参数设置得太低),则可能会导致flatMap挂起maxConcurrency

这意味着一旦一个组被发出,groupBy就需要收到更多请求才能取得进展。默认情况下,它最多打开 256 个组,然后需要更多请求或检测组是否完整。并且groupBy无法“知道”一个组是否完整,直到:

  • A)组被取消(在这种情况下,如果稍​​后出现具有相同键的值,它将重新创建一个新组)
  • B) 源已被完全处理(只有当源 < 256 个元素、默认的 groupByprefetch或从源groupBy接收到信号时才会发生这种情况)onComplete

这两个val -> val标准都concatMap符合这些要求。

groupBy标准最终会产生与值一样多的组。这里有 258 个组,而默认容量可groupBy跟踪 256 个组。

注意:如果整个序列开始时少于 256 组,则可以正常工作。 尝试将标准设置为val -> val % 2并查看其是否有效。然后尝试将范围提高到range(1, 513),看看它再次挂起的情况

由于工作原理,最后一次测试仅限于 512 个元素concatMap

concatMap在我们的例子中尤其糟糕,因为它只会订阅下一组并在第一组完成时取得进展。groupBy这与上面的条件B)相冲突,造成既不进展也不可能concatMap进展的情况。

注意:在 513 的小示例中,concatMap将开始消耗组 1 并等待其完成,然后再消耗组 2。但是一旦 groupBy为组 1 获取了 256 个元素,就会停止发射,然后等待下游开始消耗组 2。结果,它的数据太少,无法检测到组是否已完成, concatMap等待该完成信号,并且从不订阅组 2,从而挂起整个组。

使用 aflatMap可以解决这个问题,因为flatMap将同时订阅多个组,并且 2 个组对它来说没有问题:它将消耗两个组并取得进展。

  • @levantpied,可能有一种方法假设可以检测到每个组中的最后一个元素:然后您可以使用 `takeUntil(Predicate)` 或 `takeWhile(Predicate)` 以便在非正式完成时触发组的取消。从 groupBy 角度来看,取消组会关闭它 (2认同)