我需要按高基数的键对无限 Flux 进行分组。
例如:
queue
.groupBy(keyMapper, groupPrefetch)
.flatMap(
{ group ->
group.concatMap(
{ task -> makeSlowRemoteCall(task) },
0
)
.takeUntil { remoteCallResult -> remoteCallResult == DONE }
.timeout(groupTimeout, Mono.empty())
.then()
}
, concurrency
)
Run Code Online (Sandbox Code Playgroud)
我在两种情况下取消群组:
makeSlowRemoteCall()结果表明,在不久的将来,该组中很可能不会有新项目。
期间不会发出下一项groupTimeout。我使用timeout(timeout, fallback)变体来抑制 TimeoutException 并允许 flatMap 的内部发布者成功完成。
我希望未来可能的项目具有相同的密钥来创建新的 GroupedFlux 并使用相同的 flatMap 内部管道进行处理。
但是,如果我取消GroupedFlux 时仍有未请求的项目,会发生什么情况?
groupBy运算符是否将它们重新排队到具有相同密钥的新组中,否则它们将永远丢失。如果稍后什么是解决我的问题的正确方法。我也不确定在这种情况下是否需要将 concatMap()预取设置为 0。
我认为groupBy()操作员不适合我的任务,因为它有无限的源和很多组。它会产生无限的组,因此有必要以某种方式取消下游的空闲组。但取消 GroupedFlux 并保证其没有未消耗的元素是不可能的。
我认为拥有发出有限组的 groupBy 变体会很棒。就像是groupBy(keyMapper, boundryPredicate)。当boundryPredicate返回true时,当前组已完成,具有相同键的下一个元素将开始新组。
| 归档时间: |
|
| 查看次数: |
821 次 |
| 最近记录: |