从数据库+网络加载数据(Room + Retrofit + RxJava2)

bli*_*ard 4 android retrofit2 rx-java2 android-room

我有一个示例API请求,它返回用户监视列表的列表.我想在用户加载监视列表屏幕时实现以下流程:

  1. 立即从DB缓存加载数据.(cacheWatchList)

  2. 在后台启动RetroFit网络呼叫.

    一世.onSuccess返回apiWatchList
    ii.onError返回cacheWatchList

  3. Diff cacheWatchListvs.apiWatchList

    一世.相同 - >一切都很好,因为数据已经显示给用户什么都不做.

    II.不同 - >保存apiWatchList到本地商店并发apiWatchList送到下游.

到目前为止我做了什么?

Watchlist.kt

data class Watchlist(
  val items: List<Repository> = emptyList()
)
Run Code Online (Sandbox Code Playgroud)

LocalStore.kt(Android室)

  fun saveUserWatchlist(repositories: List<Repository>): Completable {    
    return Completable.fromCallable {      
      watchlistDao.saveAllUserWatchlist(*repositories.toTypedArray())
    }
  }
Run Code Online (Sandbox Code Playgroud)

RemoteStore.kt(改造api调用)

  fun getWatchlist(userId: UUID): Single<Watchlist?> {
    return api.getWatchlist(userId)
  }
Run Code Online (Sandbox Code Playgroud)

DataManager.kt

  fun getWatchlist(userId: UUID): Flowable<List<Repository>?> {
    val localSource: Single<List<Repository>?> =
      localStore.getUserWatchlist()
        .subscribeOn(scheduler.computation)

    val remoteSource: Single<List<Repository>> = remoteStore.getWatchlist(userId)
      .map(Watchlist::items)
      .doOnSuccess { items: List<Repository> ->
        localStore.saveUserWatchlist(items)
          .subscribeOn(scheduler.io)
          .subscribe()
      }
      .onErrorResumeNext { throwable ->
        if (throwable is IOException) {
          return@onErrorResumeNext localStore.getUserWatchlist()
        }
        return@onErrorResumeNext Single.error(throwable)
      }
      .subscribeOn(scheduler.io)

    return Single.concat(localSource, remoteSource)
  }
Run Code Online (Sandbox Code Playgroud)

上述流程的问题是,即使两个数据都相同,它也会为每个流源调用onNext 两次到下游(演示者).

我可以在演示者中进行数据差异逻辑并相应地更新,但我希望DataManager该类为我处理逻辑(CleanArchitecture,SOC).

我的问题?

  1. 实现上述逻辑的最佳方法是什么?

  2. 我是否泄漏了DataManager中的内部订阅(参见:doOnSuccess代码)?当演示者被销毁时,我正在处理外部订阅.

Okn*_*sif 6

fun getWatchlist(userId: UUID): Observable<List<Repository>?> {
val remoteSource: Single<List<Repository>> = 
remoteStore.getWatchlist(userId)
        .map(Watchlist::items)
        .subscribeOn(scheduler.io)

return localStore.getUserWatchlist()
        .flatMapObservable { listFromLocal: List<Repository> ->
            remoteSource
                    .observeOn(scheduler.computation)
                    .toObservable()
                    .filter { apiWatchList: List<Repository> ->
                        apiWatchList != listFromLocal
                    }
                    .flatMapSingle { apiWatchList ->
                        localSource.saveUserWatchlist(apiWatchList)
                                .andThen(Single.just(apiWatchList))
                    }
                    .startWith(listFromLocal)
        }
}
Run Code Online (Sandbox Code Playgroud)

逐步说明:

  1. 从localStore加载数据
  2. 每次localStore发出数据时,使用flatMapObservable订阅remoteSource.
  3. 由于内部observable有多个发射(来自remoteSource的更新数据,来自本地和新数据的初始数据)将Single转换为Observable.
  4. 将来自remoteSource的数据与来自localStore的数据进行比较,并仅在newData!= localData的情况下继续数据.
  5. 对于过滤器启动localSource以保存数据后的每次发射,并在完成此操作后,将保存的数据保存为Single.
  6. 根据要求,在远程请求开始时,应该继续执行来自localStore的数据,并且只需在运营商链的末尾添加startWith即可.