使用协程的 Firebase 实时快照侦听器

svk*_*aka 16 android kotlin google-cloud-firestore kotlin-coroutines

我希望能够在我的 ViewModel 中使用 Kotlin 协程收听 Firebase 数据库中的实时更新。

问题是,每当在集合中创建新消息时,我的应用程序就会冻结并且无法从此状态恢复。我需要杀死它并重新启动应用程序。

它第一次通过,我可以在 UI 上看到以前的消息。SnapshotListener第二次调用时会发生此问题。

我的observer()功能

val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener { data, error ->
    if (error != null) {
        channel.close(error)
    } else {
        if (data != null) {
            val messages = data.toObjects(MessageEntity::class.java)
            //till this point it gets executed^^^^
            channel.sendBlocking(messages)
        } else {
            channel.close(CancellationException("No data received"))
        }
    }
}
return channel
Run Code Online (Sandbox Code Playgroud)

这就是我想观察消息的方式

launch(Dispatchers.IO) {
        val newMessages =
            messageRepository
                .observer()
                .receive()
    }
}
Run Code Online (Sandbox Code Playgroud)

在我替换sendBlocking()为之后,send()我仍然没有在频道中收到任何新消息。SnapshotListener边被执行

//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) {
    channel.send(messages)
}
//scope is my viewModel
Run Code Online (Sandbox Code Playgroud)

如何使用 Kotlin 协程观察 firestore/realtime-dbs 中的消息?

svk*_*aka 14

我最终得到的是我使用了Flow,它是协程的一部分1.2.0-alpha-2

return flowViaChannel { channel ->
   firestore.collection(path).addSnapshotListener { data, error ->
        if (error != null) {
            channel.close(error)
        } else {
            if (data != null) {
                val messages = data.toObjects(MessageEntity::class.java)
                channel.sendBlocking(messages)
            } else {
                channel.close(CancellationException("No data received"))
            }
        }
    }
    channel.invokeOnClose {
        it?.printStackTrace()
    }
} 
Run Code Online (Sandbox Code Playgroud)

这就是我在 ViewModel 中观察它的方式

launch {
    messageRepository.observe().collect {
        //process
    }
}
Run Code Online (Sandbox Code Playgroud)

更多关于https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9


Nis*_*nth 14

我有这些扩展函数,所以我可以简单地从查询中获取作为 Flow 的结果。

Flow 是一个 Kotlin 协程结构,非常适合这个目的。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> {
    return callbackFlow {
        val listenerRegistration =
            addSnapshotListener { querySnapshot, firebaseFirestoreException ->
                if (firebaseFirestoreException != null) {
                    cancel(
                        message = "error fetching collection data at path - $path",
                        cause = firebaseFirestoreException
                    )
                    return@addSnapshotListener
                }
                offer(querySnapshot)
            }
        awaitClose {
            Timber.d("cancelling the listener on collection at path - $path")
            listenerRegistration.remove()
        }
    }
}

@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
    return getQuerySnapshotFlow()
        .map {
            return@map mapper(it)
        }
}
Run Code Online (Sandbox Code Playgroud)

以下是如何使用上述功能的示例。

@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> {
    return FirebaseFirestore.getInstance()
        .collection("$COLLECTION_SHOPPING_LIST")
        .getDataFlow { querySnapshot ->
            querySnapshot?.documents?.map {
                getShoppingListItemFromSnapshot(it)
            } ?: listOf()
        }
}

// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem {
        return documentSnapshot.toObject(ShoppingListItem::class.java)!!
    }
Run Code Online (Sandbox Code Playgroud)

并且在您的 ViewModel 类(或您的 Fragment)中,请确保从正确的范围内调用它,以便在用户离开屏幕时适当地删除侦听器。

viewModelScope.launch {
   getShoppingListItemsFlow().collect{
     // Show on the view.
   }
}
Run Code Online (Sandbox Code Playgroud)


Ada*_*itz 5

删除回调的扩展函数

对于 Firebase 的 Firestore 数据库,有两种类型的调用。

  1. 一次性请求—— addOnCompleteListener
  2. 实时更新—— addSnapshotListener

一次性请求

对于一次请求await,库提供了一个扩展功能org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X。该函数从 返回结果addOnCompleteListener

有关最新版本,请参阅 Maven 存储库kotlinx-coroutines-play-services

资源

实时更新

扩展函数awaitRealtime具有检查,包括验证 的状态continuation以查看它是否处于isActive状态。这很重要,因为当用户的主要内容提要因生命周期事件、手动刷新提要或从提要中删除内容而更新时,将调用该函数。如果没有这个检查,就会发生崩溃。

扩展函数.kt

data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)

suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
    addSnapshotListener({ value, error ->
        if (error == null && continuation.isActive)
            continuation.resume(QueryResponse(value, null))
        else if (error != null && continuation.isActive)
            continuation.resume(QueryResponse(null, error))
    })
}
Run Code Online (Sandbox Code Playgroud)

为了处理错误,使用了try/catch模式。

存储库.kt

object ContentRepository {
    fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
        emit(Loading())
        val labeledSet = HashSet<String>()
        val user = usersDocument.collection(getInstance().currentUser!!.uid)
        syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
        getLoggedInNonRealtimeContent(timeframe, labeledSet, this)        
    }
    // Realtime updates with 'awaitRealtime' used
    private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
                                       labeledSet: HashSet<String>, collection: String,
                                       lce: FlowCollector<Lce<PagedListResult>>) {
        val response = user.document(COLLECTIONS_DOCUMENT)
            .collection(collection)
            .orderBy(TIMESTAMP, DESCENDING)
            .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
            .awaitRealtime()
        if (response.error == null) {
            val contentList = response.packet?.documentChanges?.map { doc ->
                doc.document.toObject(Content::class.java).also { content ->
                    labeledSet.add(content.id)
                }
            }
            database.contentDao().insertContentList(contentList)
        } else lce.emit(Error(PagedListResult(null,
            "Error retrieving user save_collection: ${response.error?.localizedMessage}")))
    }
    // One time updates with 'await' used
    private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
                                                      labeledSet: HashSet<String>,
                                                      lce: FlowCollector<Lce<PagedListResult>>) =
            try {
                database.contentDao().insertContentList(
                        contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                                .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
                                .documentChanges
                                ?.map { change -> change.document.toObject(Content::class.java) }
                                ?.filter { content -> !labeledSet.contains(content.id) })
                lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
            } catch (error: FirebaseFirestoreException) {
                lce.emit(Error(PagedListResult(
                        null,
                        CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
            }
}
Run Code Online (Sandbox Code Playgroud)

  • 我尝试了您的awaitRealtime() 扩展函数,但它对我不起作用。它仅在 firestore 中的实际模型中发射一次。发出之后 continuation.isActive() 始终返回 false。您知道为什么/如何保持其活跃吗? (2认同)