在Rx链中多次切换线程

Евг*_*цов 8 android rx-java rx-java2

我们假设我有以下Android案例:

  1. 请求来自网络的组列表
  2. 显示每个组的一些UI元素
  3. 请求每个组的项目
  4. 显示每个项目的UI元素

我想用RxJava做到这一点:

webService.requestGroups()
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group);
        })
        .toList()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(items -> view.showItems(items));
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,我有2个视图对象调用,每个调用都必须在主线程上执行.并且2次调用webService,必须在后台线程上执行.

这段代码的问题:首先调用view会在后台执行导致Android RuntimeException(只有原始线程可能会触及视图或者其他东西)如果我转移.observeOn到链的开头 - 第二次webService调用将在主线程中执行.

如何在RxJava链中多次"游"穿过线程?

Sam*_*ter 15

来自SubscribeOnRx doc:

SubscribeOn运算符指定Observable将开始操作的线程,无论调用运算符的运算符链中的哪一点.另一方面,ObserveOn会影响Observable将在该运算符出现的位置使用的线程.因此,您可以在Observable运算符链中的不同点多次调用ObserveOn,以便更改某些运算符运行的线程.

SubscribeOn运算符只能应用一次并设置起始线程.ObserveOn可用于在流中的任何位置从一个线程转到另一个线程.所以我认为以下应该做你想要的:

webService.requestGroups()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(group -> {
        view.showGroup(group);
        return webService.requestItems(group)
                         .subscribeOn(Schedulers.io());
    })
    .toList()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(items -> view.showItems(items));
Run Code Online (Sandbox Code Playgroud)

但在我看来,这太复杂了.我只想订阅第一个observable,然后为每个组启动一个新链,如下所示:

webService.requestGroups()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(group -> { 
        view.showGroup(group);
        webService.requestItems(group)
            .subscribeOn(Schedulers.io()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(items -> view.showItems(items));
    });
Run Code Online (Sandbox Code Playgroud)

  • subscribeOn()的@IgorGanapolsky在链中的位置无关紧要-第一个SubscribeOn()将是整个链用于启动订阅的线程(无论是第一个链接方法调用还是最后一个链接方法调用)。相反,`observeOn()`会为该链中指向该点之后的所有内容更改线程。但是请注意,在塞缪尔(Samuel)的第二个代码块中,发生了两个完全分开的Rx链。requestItems()链未映射到requestGroups()链。 (2认同)

jav*_*ian 9

以 Samuel 的回答为基础,您可以使用更简单的非嵌套语法来实现:

webService.requestGroups()
.subscribeOn(Schedulers.io()) // the first operator (requestGroups) on the IO thread
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.map(group -> {
    view.showGroup(group);
    return group;
})
.observeOn(Schedulers.io()) //everything below on the IO thread
.flatMap(group -> {
    return webService.requestItems(group);
})
.toList()
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.subscribe(items -> view.showItems(items));
Run Code Online (Sandbox Code Playgroud)

这里有两个经验法则:

  1. subscribeOn 决定了 observable 将在哪个线程开始执行,它在链中的位置是无关紧要的,它应该只出现一次。
  2. observeOn告诉所有后续操作符将在哪个线程上执行(直到observeOn遇到另一个);它可能会在链中多次出现,改变不同代码片段的执行线程(如上例所示)。