Reactor groupBy:取消 GroupedFlux 后剩余项目会怎样?

Sok*_*lov 6 project-reactor

我需要按高基数的键对无限 Flux 进行分组。

例如:

  1. 组密钥是域 url
  2. 对一个域的调用应该严格按顺序进行(下一个调用发生在前一个域完成之后)
  3. 对不同域的调用应该是并发的
  4. 具有相同键(url)的项目之间的时间间隔未知,但预计具有突发性质。在短时间内发出几个项目,然后长时间暂停直到下一组。
queue
    .groupBy(keyMapper, groupPrefetch)
    .flatMap(
       { group ->
           group.concatMap(
               { task -> makeSlowRemoteCall(task) },
               0
           )
           .takeUntil { remoteCallResult -> remoteCallResult == DONE }
           .timeout(groupTimeout, Mono.empty())
           .then()
       }
      , concurrency
    )

Run Code Online (Sandbox Code Playgroud)

我在两种情况下取​​消群组:

  1. makeSlowRemoteCall()结果表明,在不久的将来,该组中很可能不会有新项目。

  2. 期间不会发出下一项groupTimeout。我使用timeout(timeout, fallback)变体来抑制 TimeoutException 并允许 flatMap 的内部发布者成功完成。

我希望未来可能的项目具有相同的密钥来创建新的 GroupedFlux 并使用相同的 flatMap 内部管道进行处理。

但是,如果我取消GroupedFlux 时仍有未请求的项目,会发生什么情况?

groupBy运算符是否将它们重新排队到具有相同密钥的新组中,否则它们将永远丢失。如果稍后什么是解决我的问题的正确方法。我也不确定在这种情况下是否需要将 concatMap()预取设置为 0。

Sok*_*lov 2

我认为groupBy()操作员不适合我的任务,因为它有无限的源和很多组。它会产生无限的组,因此有必要以某种方式取消下游的空闲组。但取消 GroupedFlux 并保证其没有未消耗的元素是不可能的。

我认为拥有发出有限组的 groupBy 变体会很棒。就像是groupBy(keyMapper, boundryPredicate)。当boundryPredicate返回true时,当前组已完成,具有相同键的下一个元素将开始新组。