Dmi*_*huk 0 java concurrency group-by project-reactor
我需要处理来自Flux组内 (by id) 的事件,以便在单个组内按顺序处理每个事件,但并行处理组。据我所知,这可以通过groupBy和来实现concatMap。当我实现这个时,我的测试开始无限期地挂在一些大量的唯一 ID 上。我将问题与下面的代码隔离开来,并找到了代码开始挂起的特定数字 - 256。我绝对不明白为什么会发生这种情况以及从何256而来。
这是挂起的代码:
@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
var scheduler = Schedulers
.newBoundedElastic(
1000,
1000,
"really-big-scheduler"
);
Flux.range(0, uniqueStringsCount)
.map(Object::toString)
.repeat()
// this represents "a lot of events"
.take(50_000)
.groupBy(x -> x)
// this gets the same results
// .parallel(400)
.parallel()
.flatMap(group ->
group.concatMap(e ->
// this represents a processing operation on each event
Mono.fromRunnable(() -> {
try {
Thread.sleep(0);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
})
// this also doesn't work
// Mono.delay(Duration.ofMillis(0))
// Mono.empty()
// big scheduler doesn't help either
// ).subscribeOn(scheduler)
)
// ).runOn(scheduler)
).runOn(Schedulers.parallel())
.then()
.block();
}
Run Code Online (Sandbox Code Playgroud)
我们首先构造一个Flux有很多(50k,只是一个例子)的Stringa。但其中只有一定数量的唯一字符串Flux,这些字符串被分成并行处理的一定数量的组。但每组内的事件都是通过 顺序处理的concatMap。并且此代码仅挂在256唯一的字符串上。
最初,我认为某个地方的某个线程池已耗尽,因此我添加了一个really-big-scheduler来测试它 - 但它只会执行速度较慢并且还会挂起256。然后我尝试删除阻塞Thread.sleep(我从这个开始,因为我的实际实现可能是阻塞的) - 但它也挂起256
此外,更改parallelism(400在上面的代码中)不会改变任何东西。
小智 5
Flux.groupBy在处理大量组时需要格外小心,如其javadoc 中所述:
请注意,groupBy 在组基数较低的情况下效果最好,因此请相应地选择 keyMapper 函数。
这些组需要在下游被排出和消耗,groupBy 才能正常工作。值得注意的是,当标准生成大量组时,如果下游没有适当地使用这些组(例如,由于 maxConcurrency 参数设置得太低的 flatMap),可能会导致挂起。
这里预取量设置得太低:默认情况下它设置为Queues.SMALL_BUFFER_SIZE,这是默认的256(可以使用属性 进行更改reactor.bufferSize.small)。Flux.groupBy 有一种手动设置预取量的方法:Flux.groupBy(Function, int),因此我建议将您的运算符替换为.groupBy(x -> x, 1024)或其他合适的高量。
预取量很重要,因为它是它可以处理的未完成项目的数量。在您的情况下,首先Scheduler.createWorker()进行 255 个调用,每个项目都放在一个 Worker 上,然后将其和创建的 GroupedFlux 放入 groupBy 的内部队列中等待 Worker 完成。当第 256 个项目在任何 Worker 完成之前出现时,它无法将其放入队列中,并挂起。
| 归档时间: |
|
| 查看次数: |
804 次 |
| 最近记录: |