我需要处理来自Flux组内 (by id) 的事件,以便在单个组内按顺序处理每个事件,但并行处理组。据我所知,这可以通过groupBy和来实现concatMap。当我实现这个时,我的测试开始无限期地挂在一些大量的唯一 ID 上。我将问题与下面的代码隔离开来,并找到了代码开始挂起的特定数字 - 256。我绝对不明白为什么会发生这种情况以及从何256而来。
这是挂起的代码:
@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
var scheduler = Schedulers
.newBoundedElastic(
1000,
1000,
"really-big-scheduler"
);
Flux.range(0, uniqueStringsCount)
.map(Object::toString)
.repeat()
// this represents "a lot of events"
.take(50_000)
.groupBy(x -> x)
// this gets the same results
// .parallel(400)
.parallel()
.flatMap(group ->
group.concatMap(e ->
// this represents a processing operation on each event
Mono.fromRunnable(() …Run Code Online (Sandbox Code Playgroud)