Reactor 3.x - 限制 groupBy Flux 的时间

jam*_*ham 6 project-reactor

不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;
Run Code Online (Sandbox Code Playgroud)

并且遇到了 Flux 挂起的情况,假设是因为组数大于flatMap并发性。我可以增加flatMap并发性,但没有简单的方法来判断最大可能的大小是多少。相反,我知道Foo被分组的'sFoo.key将在时间/发布顺序上彼此接近,并且宁愿在 groupBy Flux 与 flatMap 并发性上使用某种时间窗口(并以两个不同的组 w 结束) / 同样key()没什么大不了的)。

我猜groupByFlux 不会在 onComplete 之前someFastPubisheronComplete - 即 Flux 被移交以flatMap保持“开放”(尽管他们不太可能获得新事件)。

我可以通过Integer.MAX在 groupBy 中预取或Integer.MAX并发来解决这个问题- 但是有没有办法控制组的“生命”?

Sim*_*slé 7

是的:您可以take(Duration)对组应用 a以确保它们提前关闭,之后将打开具有相同密钥的新组:

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();
Run Code Online (Sandbox Code Playgroud)

  • 考虑到谁回答了这个问题,我会相信这个作品:) 但我很难想象发生了什么。`take()` 会导致 groupedBy Flux 在 Duration 之后 onComplete 吗?和/或如何向 GroupedFlux 上游发出信号? (3认同)