以下是将 Flux 拆分为不同的处理路径并将它们连接回来的正确/惯用方法 - 就问题而言,不应丢弃事件,排序不重要,并且内存是无限的。
Flux<Integer> beforeFork = Flux.range(1, 10);
ConnectableFlux<Integer> forkPoint = beforeFork
.publish()
;
Flux<String> slowPath = forkPoint
.filter(i -> i % 2 == 0)
.map(i -> "slow"+"_"+i)
.delayElements(Duration.ofSeconds(1))
;
Flux<String> fastPath = forkPoint
.filter(i -> i % 2 != 0)
.map(i -> "fast"+"_"+i)
;
// merge vs concat since we need to eagerly subscribe to
// the ConnectableFlux before the connect()
Flux.merge(fastPath, slowPath)
.map(s -> s.toUpperCase()) // pretend this is a more complex sequence
.subscribe(System.out:println)
;
forkPoint.connect();
Run Code Online (Sandbox Code Playgroud)
如果 filter() 函数比 % 慢,我想我也可以 groupBy() 然后 filter() 在 key() 上。
请注意,我确实希望 slowPath 和 fastPath 从 beforeFork 点消耗相同的事件,因为 beforeFork 生成速度很慢。
请注意,我确实有一个更复杂的后续操作(即更改为range(1,100)并且围绕预取边界的行为让我感到困惑)-但只有在上述代码段合法时我才有意义。
我相信更常见的是这样写:
Flux<Integer> beforeFork = Flux.range(1, 10).publish().autoConnect(2);
Flux<String> slowPath = beforeFork
.filter(i -> i % 2 == 0)
.map(i -> "slow"+"_"+i)
.delayElements(Duration.ofSeconds(1));
Flux<String> fastPath = beforeFork
.filter(i -> i % 2 != 0)
.map(i -> "fast"+"_"+i);
Flux.merge(fastPath, slowPath)
.map(s -> s.toUpperCase())
.doOnNext(System.out::println)
.blockLast();
Run Code Online (Sandbox Code Playgroud)
变更摘要:
autoConnect(N)- 允许我们指定beforeFork发布者连接到 N 个订阅者之后。如果我们预先知道预期路径的数量,我们可以指定它并防止发布者缓存或重复执行。
blockingLast()- 我们阻止加入Flux本身。您可能已经注意到,如果运行当前代码,则只会记录快速结果。这是因为我们实际上并没有等待缓慢的结果完成。
这是假设您的原始数据Publisher是有限的,具有固定数量的元素。Flux.interval对于诸如持续流之类的东西,需要进行其他更改。
因为prefetch我可以向您推荐这个问题:
Project Reactor 中的预取意味着什么?
| 归档时间: |
|
| 查看次数: |
1032 次 |
| 最近记录: |