如何使用Realm asObservable和RxJava的concat()运算符?

ar-*_*r-g 5 android realm rx-java

我正在尝试使用Realm与RxJava和Retrofit,DanLew 在这里描述的方式汇总来自领域和改造的输入但是如果我将链接添加到链中它会被卡住

Observable.concat(countryStorage.restoreAsObservable(),
              networkService.api()
                  .getCountries()
                  .doOnNext(countryStorage::save))
              .first()
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(//never reaching here)
Run Code Online (Sandbox Code Playgroud)

存储

 @Override public Observable<List<Country>> restoreAsObservable() {
        Realm realm = realmProvider.get();
        return realm.where(Country.class)
            .findAll()
            .asObservable()
            .map(countries -> return realm.copyFromRealm(countries))
            .first(countries -> return !countries.isEmpty())
            .doOnCompleted(realm::close());
      }
Run Code Online (Sandbox Code Playgroud)

似乎这可能发生,可观察到的是来自Realm的热点,但在文档中没有任何关于它的内容以及我如何与其他可观察者组成Realm?

更新: 它变成了旧的方式它工作正常.关于新api的问题仍然存在.

return Observable.just(
        realm.copyFromRealm(realm.where(Country.class).findAll()))
        .filter(countries -> !countries.isEmpty())
        .doOnCompleted(realm::close);
Run Code Online (Sandbox Code Playgroud)

Ale*_*yev 0

发生这种情况是因为countryStorage.restoreAsObservable()永远不会完成,如果您阅读concat文档,它明确指出:

Concat 等待订阅您传递给它的每个附加 Observable,直到前一个 Observable 完成。

相反,你可以这样做:

    countryStorage.restoreAsObservable()
          .doOnSubscribe(() -> {
              networkService.api()
                  .getCountries()
                  .subscribe(countryStorage::save)
          })
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(//do smth)
Run Code Online (Sandbox Code Playgroud)