doe*_*doe 9 java rx-java rx-java2
我有一个用例,我在一个Completable中初始化一些全局变量,并在链的下一步(使用andThen运算符)我使用这些变量.
以下示例详细解释了我的用例
说你上课了 User
class User {
String name;
}
Run Code Online (Sandbox Code Playgroud)
我有一个像这样的Observable,
private User mUser; // this is a global variable
public Observable<String> stringObservable() {
return Completable.fromAction(() -> {
mUser = new User();
mUser.name = "Name";
}).andThen(Observable.just(mUser.name));
}
Run Code Online (Sandbox Code Playgroud)
首先,我正在做一些初始化,我Completable.fromAction希望andThen操作员只有在完成后才能启动Completable.fromAction.
这意味着我希望mUser在andThen运营商启动时进行初始化.
以下是我对此观察的订阅
stringObservable()
.subscribe(s -> Log.d(TAG, "success: " + s),
throwable -> Log.e(TAG, "error: " + throwable.getMessage()));
Run Code Online (Sandbox Code Playgroud)
但是,当我运行此代码时,我收到一个错误
Attempt to read from field 'java.lang.String User.name' on a null object reference
Run Code Online (Sandbox Code Playgroud)
这意味着mUser为null, andThen在执行代码之前启动 Completable.fromAction.这里发生了什么事?
根据文件 andThen
返回一个Observable,它将订阅此Completable,一旦完成,就会订阅{@code next} ObservableSource.此Completable中的错误事件将传播到下游订户,并将导致跳过Observable的订阅.
Sar*_* Kn 30
问题不andThen在于Observable.just(mUser.name)内部声明 andThen.该just运营商将立即尝试创建观察到,虽然它只会发出后Completable.fromAction.
这里的问题是,在尝试创建Observable使用just时,mUseris为null.
解决方案:您需要推迟创建String Observable直到订阅发生,直到andThen开始发射的上游.
代替 andThen(Observable.just(mUser.name));
使用
andThen(Observable.defer(() -> Observable.just(mUser.name)));
Run Code Online (Sandbox Code Playgroud)
要么
andThen(Observable.fromCallable(() -> mUser.name));
Run Code Online (Sandbox Code Playgroud)
我不认为@Sarath Kn 的回答是 100% 正确的。Yesjust将在调用后立即创建 observable,但andThen仍会just在意外的时间调用。
我们可以比较一下andThen,flatMap以获得更好的理解。这是一个完全可运行的测试:
package com.example;
import org.junit.Test;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
public class ExampleTest {
@Test
public void createsIntermediateObservable_AfterSubscribing() {
Observable<String> coldObservable = getObservableSource()
.flatMap(integer -> getIntermediateObservable())
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
System.out.println("Cold obs created... subscribing");
TestObserver<String> testObserver = coldObservable.test();
testObserver.awaitTerminalEvent();
/*
Resulting logs:
Creating observable source
Cold obs created... subscribing
Emitting 1,2,3
Creating intermediate observable
Creating intermediate observable
Creating intermediate observable
Emitting complete notification
IMPORTANT: see that intermediate observables are created AFTER subscribing
*/
}
@Test
public void createsIntermediateObservable_BeforeSubscribing() {
Observable<String> coldObservable = getCompletableSource()
.andThen(getIntermediateObservable())
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
System.out.println("Cold obs created... subscribing");
TestObserver<String> testObserver = coldObservable.test();
testObserver.awaitTerminalEvent();
/*
Resulting logs:
Creating completable source
Creating intermediate observable
Cold obs created... subscribing
Emitting complete notification
IMPORTANT: see that intermediate observable is created BEFORE subscribing =(
*/
}
private Observable<Integer> getObservableSource() {
System.out.println("Creating observable source");
return Observable.create(emitter -> {
System.out.println("Emitting 1,2,3");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
System.out.println("Emitting complete notification");
emitter.onComplete();
});
}
private Observable<String> getIntermediateObservable() {
System.out.println("Creating intermediate observable");
return Observable.just("A");
}
private Completable getCompletableSource() {
System.out.println("Creating completable source");
return Completable.create(emitter -> {
System.out.println("Emitting complete notification");
emitter.onComplete();
});
}
}
Run Code Online (Sandbox Code Playgroud)
可以看到,当我们使用 时flatmap,订阅后just会调用,这是有道理的。如果中间 observable 依赖于发射到的项目,那么系统当然不能在订阅之前创建中间 observable。它还没有任何值。你可以想象如果在订阅之前调用这将不起作用:flatmapflatmapjust
.flatMap(integer -> getIntermediateObservable(integer))
Run Code Online (Sandbox Code Playgroud)
奇怪的andThen是能够just在订阅之前创建它的内部可观察对象(即 call )。它可以做到这一点是有道理的。唯一andThen会收到的是一个完整的通知,所以没有理由不提前创建中间 observable。唯一的问题是这不是预期的行为。
@Sarath Kn 的解决方案是正确的,但原因是错误的。如果我们使用,defer我们可以看到事情按预期工作:
@Test
public void usingDefer_CreatesIntermediateObservable_AfterSubscribing() {
Observable<String> coldObservable = getCompletableSource()
.andThen(Observable.defer(this::getIntermediateObservable))
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
System.out.println("Cold obs created... subscribing");
TestObserver<String> testObserver = coldObservable.test();
testObserver.awaitTerminalEvent();
/*
Resulting logs:
Creating completable source
Cold obs created... subscribing
Emitting complete notification
Creating intermediate observable
IMPORTANT: see that intermediate observable is created AFTER subscribing =) YEAY!!
*/
}
Run Code Online (Sandbox Code Playgroud)