在 Reactor 3 中将 Flux 拆分为多个 Flux 的最有效方法

Pro*_*tle 4 reactive-programming project-reactor

在 Reactor 3 中,通过模式匹配将异构通量拆分为多个通量的最有效方法是什么?(而且对每个通量的后续操作可能会大不相同)

例如,

Source Flux: a->b->c->a->b->c
 ||
 vv
A Flux: a->a->a
B Flux: b->b->b
C Flux: c->c->c
Run Code Online (Sandbox Code Playgroud)

我是反应式编程的新手,我想出的唯一解决方案是share()+ filter(),比如

val shared = flux.share();
shared.filter(x -> x.tag=='a').subscribe(a -> consumeA(a));
shared.filter(x -> x.tag=='b').subscribe(b -> consumeB(b));
shared.filter(x -> x.tag=='c').subscribe(c -> consumeC(c));
Run Code Online (Sandbox Code Playgroud)

这是最好的解决方案,还是有更好的范式来解决这个问题?

Phi*_*lay 8

如果组的数量相当少,那么您可以使用项目反应器文档中的Flux.groupBy引用

例如:

Flux<String> flux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
        .groupBy(s -> s.charAt(0))
        .concatMap(groupedFlux -> groupedFlux
                .startWith("Group " + groupedFlux.key()));

StepVerifier.create(flux)
        .expectNext("Group a", "a1", "a2")
        .expectNext("Group b", "b1", "b2")
        .expectNext("Group c", "c1", "c2")
        .verifyComplete();
Run Code Online (Sandbox Code Playgroud)

您可以使用groupedFlux.key()改变为每个组执行的操作。

  • “组数太少”?10&lt;、100&lt;、1000&lt;...是什么意思? (2认同)