带有 Room 错误处理的 RxJava2 - 数据库主线程异常

J-m*_*-me 0 database error-handling android reactive

我正在尝试从远程源检索项目,如果这不起作用(没有互联网),我想从房间数据库中检索缓存的项目。我为发生错误时创建了一个新单曲,并指定了它应该订阅和观察的线程。我仍然是这个例外:

java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.
Run Code Online (Sandbox Code Playgroud)

这是检索项目的方法:

public LiveData<List<Article>> getNewsArticles() {
    return LiveDataReactiveStreams.fromPublisher(
            newsService.getNewsArticles()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())

                    .onErrorResumeNext(throwable ->
                            Single.just(newsDao.findAllForNumber(AMOUNT_OF_ARTICLES_PER_PAGE))
                                    .subscribeOn(Schedulers.io())
                                    .observeOn(AndroidSchedulers.mainThread()))

                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())

                    .doOnSuccess(newsArticles -> Completable.fromAction(() ->
                            newsDao.insertAll(newsArticles))
                            .subscribeOn(Schedulers.io()))
                    .toFlowable());
}
Run Code Online (Sandbox Code Playgroud)

LiveDataReactiveStreams 将其转换为 livedata 对象并处理订阅,所以我的视图只知道 livedata。

我似乎无法让 onErrorResumeNext 调用在后台线程上工作。

任何帮助将不胜感激!

回答

我最终用以下代码解决了这个问题:

public LiveData<List<Article>> getNewsArticles() {
    return LiveDataReactiveStreams.fromPublisher(
            newsService.getNewsArticles()
                    .observeOn(Schedulers.io())
                    .doOnSuccess(newsArticles -> newsDao.insertAll(newsArticles))
                    .onErrorResumeNext(throwable -> Single.fromCallable(() -> newsDao.findAllForNumber(AMOUNT_OF_ARTICLES_PER_PAGE)))
                    .toFlowable());
}
Run Code Online (Sandbox Code Playgroud)

Dmy*_*nov 5

在 RxJava 方法subscribeOn 中指定 Observable 将在其上运行的调度程序。但是方法observeOn指定了观察者将在其上观察这个Observable 的调度器。

简单来说:

Single
.zip(observable1.getList(), observable2.getAnotherList()) // Simple zip for example
.observeOn(AndroidSchedulers.mainThread()) // switch to main thread
.map(mapper.map(list1, list2)) // this command will execute on main thread
.observeOn(Schedulers.io()) // switch to io thread
.map(anotherMapper.map(complexList)) // this command will execute on io thread
.observeOn(AndroidSchedulers.mainThread()) // switch to main thread
.subscribeOn(Schedulers.io()) // specify thread for zip command
Run Code Online (Sandbox Code Playgroud)

ObserveOn 仅适用于下游。observeOn 之后的所有方法都移到了 IO 线程中。而observeOn 之前的方法仍然在主线程中。

在您的示例中,您尝试在主线程上调用 room dao 命令,而系统不允许在主线程上执行。您可以在每个命令上设置断点,然后在 IDE 中查看将执行命令的线程名称。

有关更复杂的示例,请参阅本文