RxJava subscribeOn和observeOn不会覆盖之前设置的原始Scheduler?

Ata*_*anL 4 android rx-java

我在Android中使用了RxJava和Retrofit 2,我在subscribe()之前调用了subscribeOn(Schedulers.io())android observeOn(AndroidSchedulers.mainThread())全局.但是,有时我想调用subscribeOn(Schedulers.immediate())android observeOn(Schedulers.immediate())来覆盖Scheduler之前设置的同步进程.但我发现它不起作用,android工作仍然会在io()线程上处理,android结果由mainThread()处理.为什么?

Ric*_*rdo 6

这就是RxJava的工作方式.

看看这个视频教程,从12:50开始.因此,视频中的示例:

Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.newThread())
    .subscribeOn(Schedulers.io())
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

会发生什么事情就是把subscribeOn()所有电话都堵住.在这种情况下,subscribeOn(Schedulers.io())首先生成并在io线程上订阅它上面的所有内容.但接下来subscribeOn(Schedulers.newThread())会产生并且优先考虑(因为它被称为最后一个)而是在其上订阅所有内容.没有建立一个线程链.在这个例子中,你基本上没有任何理由产生io线程.

为了更好地处理这些subscribeOn()observeOn()方法,我建议你看一下同一位视频作者的这篇文章.他建议使用a Transformer来包含对这些方法的调用:

Transformer实际上只是Func1<Observable<T>, Observable<R>>.换句话说:喂它Observable一种类型,它将返回Observable另一种类型 .这与内联调用一系列运算符完全相同.

这样,您可以使用如下方法:

<T> Transformer<T, T> applySchedulers() {  
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)

或者,如果要重用变换器,可以进行以下设置:

final Transformer schedulersTransformer =  
    observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {  
    return (Transformer<T, T>) schedulersTransformer;
}
Run Code Online (Sandbox Code Playgroud)

然后上面的例子看起来像:

Observable.just(1, 2, 3)
    .compose(applySchedulers())
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

希望有所帮助.