拆分/重新加入 Flux 的正确方法

jam*_*ham 6 project-reactor

以下是将 Flux 拆分为不同的处理路径并将它们连接回来的正确/惯用方法 - 就问题而言,不应丢弃事件,排序不重要,并且内存是无限的。

Flux<Integer> beforeFork = Flux.range(1, 10);

ConnectableFlux<Integer> forkPoint = beforeFork
    .publish()
;

Flux<String> slowPath = forkPoint
        .filter(i -> i % 2 == 0)
        .map(i -> "slow"+"_"+i)
        .delayElements(Duration.ofSeconds(1))
;

Flux<String> fastPath = forkPoint
        .filter(i -> i % 2 != 0)
        .map(i -> "fast"+"_"+i)
;

// merge vs concat since we need to eagerly subscribe to
// the ConnectableFlux before the connect()
Flux.merge(fastPath, slowPath)
    .map(s -> s.toUpperCase()) // pretend this is a more complex sequence
    .subscribe(System.out:println)
;
forkPoint.connect();
Run Code Online (Sandbox Code Playgroud)

如果 filter() 函数比 % 慢,我想我也可以 groupBy() 然后 filter() 在 key() 上。

请注意,我确实希望 slowPath 和 fastPath 从 beforeFork 点消耗相同的事件,因为 beforeFork 生成速度很慢。

请注意,我确实有一个更复杂的后续操作(即更改为range(1,100)并且围绕预取边界的行为让我感到困惑)-但只有在上述代码段合法时我才有意义。

Reg*_*ans 1

我相信更常见的是这样写:

Flux<Integer> beforeFork = Flux.range(1, 10).publish().autoConnect(2);

Flux<String> slowPath = beforeFork
    .filter(i -> i % 2 == 0)
    .map(i -> "slow"+"_"+i)
    .delayElements(Duration.ofSeconds(1));

Flux<String> fastPath = beforeFork
    .filter(i -> i % 2 != 0)
    .map(i -> "fast"+"_"+i);

Flux.merge(fastPath, slowPath)
    .map(s -> s.toUpperCase())
    .doOnNext(System.out::println)
    .blockLast();
Run Code Online (Sandbox Code Playgroud)

变更摘要:

  • autoConnect(N)- 允许我们指定beforeFork发布者连接到 N 个订阅者之后。如果我们预先知道预期路径的数量,我们可以指定它并防止发布者缓存或重复执行。

  • blockingLast()- 我们阻止加入Flux本身。您可能已经注意到,如果运行当前代码,则只会记录快速结果。这是因为我们实际上并没有等待缓慢的结果完成。

这是假设您的原始数据Publisher是有限的,具有固定数量的元素。Flux.interval对于诸如持续流之类的东西,需要进行其他更改。

因为prefetch我可以向您推荐这个问题: Project Reactor 中的预取意味着什么?