Reactor撰写vs FlatMap

pau*_*aul 3 java spring project-reactor

我继续玩Reactor,现在我看到compose操作符的行为完全一样flatMap,我想知道是否存在我不理解的差异。

    @Test
public void compose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .compose(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

@Test
public void flatMapVsCompose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .flatMap(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)

这两个示例的行为并返回相同的结果。

问候。

pvp*_*ran 7

@Andrew的解释非常好。只是想添加一个示例以更好地理解。

Flux.just("1", "2")
        .compose( stringFlux -> {
            System.out.println("In compose"); // It takes whe whole Flux as input
           return stringFlux.collectList();
        }).subscribe(System.out::println);


Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
            System.out.println("In flatMap");
            return Flux.just(Integer.parseInt(s));
        }).subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这产生输出

In compose
[1, 2]
In flatMap
1
In flatMap
2
Run Code Online (Sandbox Code Playgroud)

表示compose适用于整个流,但flatMap适用于流中的单个项目


And*_*lko 5

丹·卢的出色解释

不同之compose()处在于是更高级别的抽象:它在整个流上运行,而不是单独发出的项目。具体来说:

  • compose()Observable<T>从流中获取原始图像的唯一方法。因此,影响整个流(如subscribeOn()observeOn())的运算符需要使用compose()

    相反,如果放置subscribeOn()/ observeOn()in flatMap(),则只会影响Observable您在in中创建的内容,flatMap()而不会影响流的其余部分。

  • compose()创建Observable流时立即执行,就像您已内联编写操作符一样。flatMap()onNext()每次调用它时执行。换句话说,flatMap()转换每个项目,而compose()转换整个流。

  • flatMap()效率一定较低,因为Observable每次onNext()调用都必须创建一个新的。compose()按原样在流上操作。如果要用可重用的代码替换某些运算符,请使用compose()flatMap()有很多用途,但这不是其中之一。