压缩室可流动防止更新

Gab*_*tin 4 android rx-java2 android-room

getPlaces我的存储库中有一个方法:

override fun getPlaces(filter: FilterRequest): Flowable<List<Place>> {
    return from(placesApi.filter(filter))
            .doOnSuccess {
                placesDao.savePlaces(it)
            }
            .flatMapPublisher { it ->
                placesDao.getPlaces(it.map { it.placeId })
            }
}
Run Code Online (Sandbox Code Playgroud)

此方法从api收集结果,然后将结果保存在数据库中,并返回一个带有flow的流,其中包含通过id从数据库中检索到的位置Flowable

@Query("select * from Places where placeId in (:placesIds)")
fun getPlaces(placesIds: List<String>) : Flowable<List<Place>>
Run Code Online (Sandbox Code Playgroud)

现在,每次我更改其中一个对象时,就可以在整个应用程序中看到所有更改。

现在,我想将这些结果与到当前位置的距离结合起来,如下所示:

 override fun addDistanceToPlaces(req: Flowable<List<Place>>): Flowable<List<Place>> {
        return req
                .zipWith(getLastLocation().toFlowable(BackpressureStrategy.LATEST),
                        BiFunction<List<Place>, Location, List<Place>> { places, location ->
                            places.forEach {
                                var placeLocation = Location(it.placeName)
                                placeLocation.latitude = it.latitude
                                placeLocation.longitude = it.longitude

                                it.distance = location.distanceTo(placeLocation)
                            }
                            places.sortedBy {
                                it.distance
                            }
                        })
                .onErrorResumeNext { t: Throwable ->
                    req
                }

    }
Run Code Online (Sandbox Code Playgroud)

此方法有效,但是如果应用此方法,则会丢失Room的“更新”;更改不会通知观察者,因此我必须进行手动刷新。

为什么会这样呢?不zip应该将两种来源的排放量结合起来吗?

小智 6

您的问题是尝试将zip运算符用于您的用例。通过对输入的可观察值进行配对来发射Zip。它不会为您的单个可观察对象的每一次更改发出,而是在它们都发出时发出。查看大理石,以帮助您形象地观察其行为:

http://reactivex.io/documentation/operators/zip.html

因此,在您的情况下,Room Observable正在发送到您的zip函数中,但是Observable的位置没有更新,因此您不会调用该函数。

我认为您正在寻找combineLatest运营商。这将一直等到Room Observable和Location Observable都售出一次,然后再发出其中一个Observable并将调用您的Combine函数并将后续值发送到您的应用程序。