为什么在RxJava中通过merge实现flatMap?

sou*_*ton 0 rx-java flatmap rx-android

为什么RxJava 1.x flatMap()运算符是通过merge实现的?

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }
Run Code Online (Sandbox Code Playgroud)

从flatMap()调用中,我只能返回一个符合的Observable <? extends Observable<? extends R>>。比map(func)调用将它包装到另一个Observable中,这样我们就得到了这样的东西Observable<? extends Observable<? extends R>>。这使我认为map(func)之后的merge()调用是不必要的。

据说merge()运算符执行以下操作:

将可发射Observable的Observable展平为单个Observable,该Observable发射那些Observable发射的项目,而无需进行任何转换。

现在,在平面地图内,我们只能有一个Observable发出一个Observable。为什么要合并?我在这里想念什么?

谢谢。

Sim*_*slé 6

查看签名可能会有所帮助:

想象一下,Observable<String>您想要flatMap一个单独的角色。使用flatMap的方法是:

Observable.just("foo", "hello")
          .flatMap(s -> Observable.from(s.split("")))
Run Code Online (Sandbox Code Playgroud)

此可观察的类型是什么?这是一个Observable<String>

现在,不要使用flatMap,而是使用map 相同的功能。什么类型的

Observable.just("foo", "hello")
          .map(s -> Observable.from(s.split("")))
Run Code Online (Sandbox Code Playgroud)

您会看到它实际上是Observable<Observable<String>>...并且,如果我们订阅此可观察的内容并打印出发出的项目,我们将得到:

rx.Observable@5b275dab
rx.Observable@61832929
Run Code Online (Sandbox Code Playgroud)

不太有用。更糟糕的是,这些Observable尚未被订阅,因此它们不会发出任何数据:(

我们看到的目标flatMap是让函数为Observable<T>每个源项生成一个内部,然后订阅这些内部可观察变量,并在输出中将它们的发射放在一起Observable<T>。并merge做到了!

要验证这一点,请将上面的地图结果包装在中Observable.merge(...)

Observable<Observable<String>> mapped = 
    Observable.just("foo", "hello")
              .map(s -> Observable.from(s.split("")));

Observable.merge(mapped)
          .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

输出:

f
o
o
h
e
l
l
o
Run Code Online (Sandbox Code Playgroud)