RxJava - 如何设置Observer来阻止

j2e*_*nue 3 multithreading rx-java

我希望我Observable阻止,直到操作完成,然后继续下一个方法调用等.看看这段代码:

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;

Observable observer1 = Observable.just(1, 2, 3)
        .observeOn(AndroidSchedulers.mainThread());

Observable observer2 = observer1.map(new Func1<Integer, Integer>() {
    @Override
    public Integer call(Integer myint) {
        //multiples each int by 2
        return myint * 2;
    }
});

observer2.observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(AndroidSchedulers.mainThread());

observer2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer i) {
        System.out.println("this is the Integer multiplied by two:" + i);
    }
});

System.out.println("I want this statement to come after multiplication completes");
Run Code Online (Sandbox Code Playgroud)

我意识到我可以使用onComplete回电,但这不是我的观点.我试图找出如何阻止观察者,直到它完成,然后继续我的其余代码.此时日志看起来像这样:

I/System.out? I want this statement to come after multiplication completes
I/System.out? this is the Integer multiplied by two:2
I/System.out? this is the Integer multiplied by two:4
I/System.out? this is the Integer multiplied by two:6
Run Code Online (Sandbox Code Playgroud)

还要注意我在MainThread上观察和订阅所有内容,如果我没有指定,这是默认完成的吗?

kjo*_*nes 11

如果要阻塞直到Observable完成使用observable.toBlocking().forEach()而不是subscribe().

observer2
    .toBlocking()
    .forEach(new Action1<Integer>() {
        @Override
        public void call(Integer i) {
            System.out.println("this is the Integer multiplied by two:" + i);
        }
    });
Run Code Online (Sandbox Code Playgroud)

除了获得期望的效果之外,还可以使用许多阻塞可观察操作符forEach().例如,如果您只需要发出的第一个项目,则使用observable.toBlocking().first()

另请注意,RxJava API会为您正在进行的每个调用返回一个新的Observable.因此,以下行对observable2使用的调度程序没有影响.

observer2.observeOn(AndroidSchedulers.mainThread()).subscribeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

它确实使用指定的调度程序创建一个新的Observable,但抛出它,因为返回的Observable没有分配给任何变量.您可以执行以下操作.

observer2
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(AndroidSchedulers.mainThread())
    .toBlocking()
    .forEach(new Action1<Integer>() {
        @Override
        public void call(Integer i) {
            System.out.println("this is the Integer multiplied by two:" + i);
        }
    });
Run Code Online (Sandbox Code Playgroud)