具有Firestore实时数据的RxJava

vih*_*kat 2 android rx-java2

我有存储库类。在这些课程中,我这样简单collection("..").get()

override fun getTestCollectionItems(): Observable<TestModel> {

    return Observable.create { subscriber ->

        firebaseFirestore.collection(TEST_COLLECTION)
                .get()
                .addOnCompleteListener { task ->

                    if (task.isSuccessful()) {
                        for (document in task.getResult()) {
                            if (document.exists()) {
                                val documentModel = document.toObject(TestModel::class.java)
                                subscriber.onNext(documentModel)
                            }
                        }
                        subscriber.onComplete()
                    } else {
                        subscriber.onError(task.exception!!)
                    }
                }
    }
}
Run Code Online (Sandbox Code Playgroud)

但是我找到了实时Firecloud选项。如果我将侦听器移至存储库,那么它的含义是否正确?

我尝试了下一个:

override fun getRealTimeCollection() : Observable<TestModel> {

    return Observable.create { subscriber ->

        firebaseFirestore.collection(TEST_COLLECTION).document("3lPtYZEEhPdfvZ1wfHIP")
            .addSnapshotListener(EventListener<DocumentSnapshot> { snapshot, e ->
                if (e != null) {
                    Log.w("test", "Listen failed.", e)
                    subscriber.onError(e)
                    return@EventListener
                }

                if (snapshot != null && snapshot.exists()) {
                    Log.d("test", "Current data: " + snapshot.data)
                    val documentModel = snapshot.toObject(TestModel::class.java)
                    subscriber.onNext(documentModel)
                } else {
                    Log.d("test", "Current data: null")

                }
            })
    }
}
Run Code Online (Sandbox Code Playgroud)

与DisposableObservable。但是当我处置它时,Firebase仍然发送了新数据。这将是内存泄漏。在这种情况下如何使用RxJava?将实时数据移动到存储库是否正确?

谢谢!

yos*_*riz 5

When you create Observable, using the Observable.create method, you get actually ObservableEmitter<T>, with this emitter you should add Cancellable or Disposable using setCancellable()/setDisposable. (you can read about the difference here)
These callbacks will be triggered when you'll dispose your Observable and there you should add the proper un-registration logic of firestore.

override fun getRealTimeCollection(): Observable<TestModel> {

    return Observable.create { emitter ->

        val listenerRegistration = firebaseFirestore.collection(TEST_COLLECTION).document("3lPtYZEEhPdfvZ1wfHIP")
                .addSnapshotListener(EventListener<DocumentSnapshot> { snapshot, e ->
                    if (e != null) {
                        Log.w("test", "Listen failed.", e)
                        emitter.onError(e)
                        return@EventListener
                    }

                    if (snapshot != null && snapshot.exists()) {
                        Log.d("test", "Current data: " + snapshot.data)
                        val documentModel = snapshot.toObject(TestModel::class.java)
                        emitter.onNext(documentModel)
                    } else {
                        Log.d("test", "Current data: null")

                    }
                })
        emitter.setCancellable { listenerRegistration.remove() }
    }
}
Run Code Online (Sandbox Code Playgroud)