RxJava`Completable.andThen`没有连续执行?

doe*_*doe 9 java rx-java rx-java2

我有一个用例,我在一个Completable中初始化一些全局变量,并在链的下一步(使用andThen运算符)我使用这些变量.

以下示例详细解释了我的用例

说你上课了 User

        class User {
            String name;
        }
Run Code Online (Sandbox Code Playgroud)

我有一个像这样的Observable,

        private User mUser; // this is a global variable

        public Observable<String> stringObservable() {
            return Completable.fromAction(() -> {
                mUser = new User();
                mUser.name = "Name";
            }).andThen(Observable.just(mUser.name));
        }           
Run Code Online (Sandbox Code Playgroud)

首先,我正在做一些初始化,我Completable.fromAction希望andThen操作员只有在完成后才能启动Completable.fromAction.

这意味着我希望mUserandThen运营商启动时进行初始化.

以下是我对此观察的订阅

             stringObservable()
            .subscribe(s -> Log.d(TAG, "success: " + s),
                    throwable -> Log.e(TAG, "error: " + throwable.getMessage()));
Run Code Online (Sandbox Code Playgroud)

但是,当我运行此代码时,我收到一个错误

          Attempt to read from field 'java.lang.String User.name' on a null object reference
Run Code Online (Sandbox Code Playgroud)

这意味着mUser为null, andThen在执行代码之前启动 Completable.fromAction.这里发生了什么事?

根据文件 andThen

返回一个Observable,它将订阅此Completable,一旦完成,就会订阅{@code next} ObservableSource.此Completable中的错误事件将传播到下游订户,并将导致跳过Observable的订阅.

Sar*_* Kn 30

问题不andThen在于Observable.just(mUser.name)内部声明 andThen.该just运营商将立即尝试创建观察到,虽然它只会发出后Completable.fromAction.

这里的问题是,在尝试创建Observable使用just时,mUseris为null.

解决方案:您需要推迟创建String Observable直到订阅发生,直到andThen开始发射的上游.

代替 andThen(Observable.just(mUser.name));

使用

 andThen(Observable.defer(() -> Observable.just(mUser.name)));
Run Code Online (Sandbox Code Playgroud)

要么

 andThen(Observable.fromCallable(() -> mUser.name));
Run Code Online (Sandbox Code Playgroud)

  • `Observable.just(T)` 文档指出:“请注意,该项目被采用并**重新发出**,而不是通过任何方式仅通过任何方式计算。使用 `fromCallable(Callable)` 来**生成单个项目** 按需(当观察者_订阅它_)。” (2认同)
  • 是。答案中也有提到 (2认同)

tir*_*r38 8

我不认为@Sarath Kn 的回答是 100% 正确的。Yesjust将在调用后立即创建 observable,但andThen仍会just在意外的时间调用。

我们可以比较一下andThenflatMap以获得更好的理解。这是一个完全可运行的测试:

package com.example;

import org.junit.Test;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;

public class ExampleTest {

    @Test
    public void createsIntermediateObservable_AfterSubscribing() {
        Observable<String> coldObservable = getObservableSource()
                .flatMap(integer -> getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver<String> testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating observable source
        Cold obs created... subscribing
        Emitting 1,2,3
        Creating intermediate observable
        Creating intermediate observable
        Creating intermediate observable
        Emitting complete notification

        IMPORTANT: see that intermediate observables are created AFTER subscribing
         */
    }

    @Test
    public void createsIntermediateObservable_BeforeSubscribing() {
        Observable<String> coldObservable = getCompletableSource()
                .andThen(getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver<String> testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating completable source
        Creating intermediate observable
        Cold obs created... subscribing
        Emitting complete notification

        IMPORTANT: see that intermediate observable is created BEFORE subscribing =(
         */
    }

    private Observable<Integer> getObservableSource() {
        System.out.println("Creating observable source");
        return Observable.create(emitter -> {
            System.out.println("Emitting 1,2,3");
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        });
    }

    private Observable<String> getIntermediateObservable() {
        System.out.println("Creating intermediate observable");
        return Observable.just("A");
    }

    private Completable getCompletableSource() {
        System.out.println("Creating completable source");
        return Completable.create(emitter -> {
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

可以看到,当我们使用 时flatmap,订阅just会调用,这是有道理的。如果中间 observable 依赖于发射的项目,那么系统当然不能在订阅之前创建中间 observable。它还没有任何值。你可以想象如果在订阅之前调用这将不起作用:flatmapflatmapjust

.flatMap(integer -> getIntermediateObservable(integer))
Run Code Online (Sandbox Code Playgroud)

奇怪的andThen是能够just在订阅之前创建它的内部可观察对象(即 call )。它可以做到这一点是有道理的。唯一andThen会收到的是一个完整的通知,所以没有理由不提前创建中间 observable。唯一的问题是这不是预期的行为。

@Sarath Kn 的解决方案是正确的,但原因是错误的。如果我们使用,defer我们可以看到事情按预期工作:

@Test
public void usingDefer_CreatesIntermediateObservable_AfterSubscribing() {
    Observable<String> coldObservable = getCompletableSource()
            .andThen(Observable.defer(this::getIntermediateObservable))
            .subscribeOn(Schedulers.trampoline())
            .observeOn(Schedulers.trampoline());
    System.out.println("Cold obs created... subscribing");
    TestObserver<String> testObserver = coldObservable.test();
    testObserver.awaitTerminalEvent();

    /*
    Resulting logs:

    Creating completable source
    Cold obs created... subscribing
    Emitting complete notification
    Creating intermediate observable

    IMPORTANT: see that intermediate observable is created AFTER subscribing =) YEAY!!
     */
}
Run Code Online (Sandbox Code Playgroud)