不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
Run Code Online (Sandbox Code Playgroud)
并且遇到了 Flux 挂起的情况,假设是因为组数大于flatMap并发性。我可以增加flatMap并发性,但没有简单的方法来判断最大可能的大小是多少。相反,我知道Foo被分组的'sFoo.key将在时间/发布顺序上彼此接近,并且宁愿在 groupBy Flux 与 flatMap 并发性上使用某种时间窗口(并以两个不同的组 w 结束) / 同样key()没什么大不了的)。
我猜groupByFlux 不会在 onComplete 之前someFastPubisheronComplete - 即 Flux 被移交以flatMap保持“开放”(尽管他们不太可能获得新事件)。
我可以通过Integer.MAX在 groupBy 中预取或Integer.MAX并发来解决这个问题- 但是有没有办法控制组的“生命”?
是的:您可以take(Duration)对组应用 a以确保它们提前关闭,之后将打开具有相同密钥的新组:
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1667 次 |
| 最近记录: |