在后台线程上处理Observable

Won*_*abo 20 rx-java rx-android

我正在使用RxAndroid进行流操作.在我的实际用例中,我从服务器获取列表(使用Retrofit).我正在使用调度程序在后台线程上完成工作,并在Android UI(主)线程上获得最终发射.

这适用于网络调用,但是我意识到网络调用后我的运算符不使用后台线程,而是在主线程上调用.

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> {});
Run Code Online (Sandbox Code Playgroud)

如何确保所有操作都在后台线程上执行?

nha*_*man 58

TL; DR:observeOn(AndroidSchedulers.mainThread())向下移动filter(...).


subscribeOn(...)用于指定Observable开始操作的线程.后续调用subscribeOn将被忽略.

因此,如果您要编写以下内容,则所有内容都将在以下位置执行Schedulers.newThread():

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> { doSomething(integer1); });
Run Code Online (Sandbox Code Playgroud)

当然,现在,这不是你想要的:你想要doSomething在主线程上.
这就是observeOn到位的地方.所有操作 observeOn都对调度执行.因此,在您的示例中,将filter在主线程上执行.

相反,observeOn向下移动到之前subscribe:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });
Run Code Online (Sandbox Code Playgroud)

现在,filter将发生在'新线程'和doSomething主线程上.


为了更进一步,您可以observeOn多次使用:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });
Run Code Online (Sandbox Code Playgroud)

在这种情况下,提取将在新线程上进行,在计算线程上进行过滤,doSomething在主线程上进行.

Checkout ReactiveX -用于官方文档的SubscribeOn运算符.

  • 很棒的答案,10倍! (2认同)