exc*_*bot 4 java project-reactor
根据文档参考, groupBy 运算符根据运算符的键盘映射器功能将给定的 Flux 拆分为多个 GroupedFlux。如果我使用 257 个整数范围执行以下代码,它可以正常工作,但不能使用 258 个整数
public void groupByTest() {
Flux.range(1, 258)
.groupBy(val -> val)
.concatMap(g -> g.map(val -> val + "test"))
.doOnNext(System.out::println)
.blockLast();
}
Run Code Online (Sandbox Code Playgroud)
这是否意味着 groupBy 运算符不能创建超过 257 个组?
正如 javadoc 中所述groupBy
:
这些组需要在下游被排出和消耗,groupBy 才能正常工作。值得注意的是,当标准生成大量组时,如果下游没有适当地消耗这些组(例如,由于参数设置得太低),则可能会导致
flatMap
挂起maxConcurrency
。
这意味着一旦一个组被发出,groupBy
就需要收到更多请求才能取得进展。默认情况下,它最多打开 256 个组,然后需要更多请求或检测组是否完整。并且groupBy
无法“知道”一个组是否完整,直到:
prefetch
或从源groupBy
接收到信号时才会发生这种情况)onComplete
这两个val -> val
标准都concatMap
符合这些要求。
该groupBy
标准最终会产生与值一样多的组。这里有 258 个组,而默认容量可groupBy
跟踪 256 个组。
注意:如果整个序列开始时少于 256 组,则可以正常工作。 尝试将标准设置为
val -> val % 2
并查看其是否有效。然后尝试将范围提高到range(1, 513)
,看看它再次挂起的情况。
由于工作原理,最后一次测试仅限于 512 个元素concatMap
。
concatMap
在我们的例子中尤其糟糕,因为它只会订阅下一组并在第一组完成时取得进展。groupBy
这与上面的条件B)相冲突,造成既不进展也不可能concatMap
进展的情况。
注意:在 513 的小示例中,
concatMap
将开始消耗组 1 并等待其完成,然后再消耗组 2。但是一旦groupBy
为组 1 获取了 256 个元素,就会停止发射,然后等待下游开始消耗组 2。结果,它的数据太少,无法检测到组是否已完成,concatMap
等待该完成信号,并且从不订阅组 2,从而挂起整个组。使用 a
flatMap
可以解决这个问题,因为flatMap
将同时订阅多个组,并且 2 个组对它来说没有问题:它将消耗两个组并取得进展。
归档时间: |
|
查看次数: |
2189 次 |
最近记录: |