RxJava:数据库和远程服务器

Fra*_*esc 0 android rx-java retrofit2

我有一个要从本地数据库(如果可用)或其他远程服务器检索的对象列表。我正在使用 RxJava Observables(数据库使用 SqlBrite,远程服务器使用 Retrofit)。

我的查询代码如下:

Observable<List<MyObject>> dbObservable = mDatabase
            .createQuery(MyObject.TABLE_NAME,MyObject.SELECT_TYPE_A)
            .mapToList(MyObject.LOCAL_MAPPER);
Observable<List<MyObject>> remoteObservable = mRetrofitService.getMyObjectApiService().getMyObjects();

return Observable.concat(dbObservable, remoteObservable)
    .first(new Func1<List<MyObject>, Boolean>() {
                @Override
                public Boolean call(List<MyObject> myObjects) {
                    return !myObjects.isEmpty();
                }
            });
Run Code Online (Sandbox Code Playgroud)

我看到第一个 observable 正在运行并使用空列表命中第一个方法,但是改造后的 observable 没有运行,没有网络请求。如果我切换 observable 的顺序,或者只是返回远程 observable,它会按预期工作,它会访问远程服务器并返回对象列表。

为什么远程 observable 在这种情况下无法运行?当我首先将 observables 与 db 连接起来,然后再进行改造时,不会调用订阅者的 onNext、orError 和 onComplete 方法。

谢谢!

ehe*_*hhh 5

Kaushik Gopal 在他的RxJava-Android-Samples github 项目中解决了这个问题。

他建议使用这种技术:

getFreshNetworkData()
          .publish(network ->
                         Observable.merge(network,
                                          getCachedDiskData().takeUntil(network)))
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Subscriber<List<MyObject>() {

              ...
          });
Run Code Online (Sandbox Code Playgroud)

在您的情况下,它可能如下所示:

remoteObservable
      .publish(network ->
                     Observable.merge(network,
                                      dbObservable.takeUntil(network)))
      .first(myObjects -> !myObjects.isEmpty());
Run Code Online (Sandbox Code Playgroud)

编辑:听起来你可能只需要这个:

dbObservable
    .flatMap(localResult -> {
        if (localResult.isEmpty()) {
            return remoteObservable;
        } else {
            return Observable.just(localResult);
        }
    });
Run Code Online (Sandbox Code Playgroud)