svk*_*aka 16 android kotlin google-cloud-firestore kotlin-coroutines
我希望能够在我的 ViewModel 中使用 Kotlin 协程收听 Firebase 数据库中的实时更新。
问题是,每当在集合中创建新消息时,我的应用程序就会冻结并且无法从此状态恢复。我需要杀死它并重新启动应用程序。
它第一次通过,我可以在 UI 上看到以前的消息。SnapshotListener第二次调用时会发生此问题。
我的observer()功能
val channel = Channel<List<MessageEntity>>()
firestore.collection(path).addSnapshotListener { data, error ->
if (error != null) {
channel.close(error)
} else {
if (data != null) {
val messages = data.toObjects(MessageEntity::class.java)
//till this point it gets executed^^^^
channel.sendBlocking(messages)
} else {
channel.close(CancellationException("No data received"))
}
}
}
return channel
Run Code Online (Sandbox Code Playgroud)
这就是我想观察消息的方式
launch(Dispatchers.IO) {
val newMessages =
messageRepository
.observer()
.receive()
}
}
Run Code Online (Sandbox Code Playgroud)
在我替换sendBlocking()为之后,send()我仍然没有在频道中收到任何新消息。SnapshotListener边被执行
//channel.sendBlocking(messages) was replaced by code bellow
scope.launch(Dispatchers.IO) {
channel.send(messages)
}
//scope is my viewModel
Run Code Online (Sandbox Code Playgroud)
如何使用 Kotlin 协程观察 firestore/realtime-dbs 中的消息?
svk*_*aka 14
我最终得到的是我使用了Flow,它是协程的一部分1.2.0-alpha-2
return flowViaChannel { channel ->
firestore.collection(path).addSnapshotListener { data, error ->
if (error != null) {
channel.close(error)
} else {
if (data != null) {
val messages = data.toObjects(MessageEntity::class.java)
channel.sendBlocking(messages)
} else {
channel.close(CancellationException("No data received"))
}
}
}
channel.invokeOnClose {
it?.printStackTrace()
}
}
Run Code Online (Sandbox Code Playgroud)
这就是我在 ViewModel 中观察它的方式
launch {
messageRepository.observe().collect {
//process
}
}
Run Code Online (Sandbox Code Playgroud)
更多关于https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9
Nis*_*nth 14
我有这些扩展函数,所以我可以简单地从查询中获取作为 Flow 的结果。
Flow 是一个 Kotlin 协程结构,非常适合这个目的。 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
@ExperimentalCoroutinesApi
fun CollectionReference.getQuerySnapshotFlow(): Flow<QuerySnapshot?> {
return callbackFlow {
val listenerRegistration =
addSnapshotListener { querySnapshot, firebaseFirestoreException ->
if (firebaseFirestoreException != null) {
cancel(
message = "error fetching collection data at path - $path",
cause = firebaseFirestoreException
)
return@addSnapshotListener
}
offer(querySnapshot)
}
awaitClose {
Timber.d("cancelling the listener on collection at path - $path")
listenerRegistration.remove()
}
}
}
@ExperimentalCoroutinesApi
fun <T> CollectionReference.getDataFlow(mapper: (QuerySnapshot?) -> T): Flow<T> {
return getQuerySnapshotFlow()
.map {
return@map mapper(it)
}
}
Run Code Online (Sandbox Code Playgroud)
以下是如何使用上述功能的示例。
@ExperimentalCoroutinesApi
fun getShoppingListItemsFlow(): Flow<List<ShoppingListItem>> {
return FirebaseFirestore.getInstance()
.collection("$COLLECTION_SHOPPING_LIST")
.getDataFlow { querySnapshot ->
querySnapshot?.documents?.map {
getShoppingListItemFromSnapshot(it)
} ?: listOf()
}
}
// Parses the document snapshot to the desired object
fun getShoppingListItemFromSnapshot(documentSnapshot: DocumentSnapshot) : ShoppingListItem {
return documentSnapshot.toObject(ShoppingListItem::class.java)!!
}
Run Code Online (Sandbox Code Playgroud)
并且在您的 ViewModel 类(或您的 Fragment)中,请确保从正确的范围内调用它,以便在用户离开屏幕时适当地删除侦听器。
viewModelScope.launch {
getShoppingListItemsFlow().collect{
// Show on the view.
}
}
Run Code Online (Sandbox Code Playgroud)
对于 Firebase 的 Firestore 数据库,有两种类型的调用。
addOnCompleteListeneraddSnapshotListener对于一次请求await,库提供了一个扩展功能org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X。该函数从 返回结果addOnCompleteListener。
有关最新版本,请参阅 Maven 存储库kotlinx-coroutines-play-services。
资源
扩展函数awaitRealtime具有检查,包括验证 的状态continuation以查看它是否处于isActive状态。这很重要,因为当用户的主要内容提要因生命周期事件、手动刷新提要或从提要中删除内容而更新时,将调用该函数。如果没有这个检查,就会发生崩溃。
扩展函数.kt
data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)
suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
addSnapshotListener({ value, error ->
if (error == null && continuation.isActive)
continuation.resume(QueryResponse(value, null))
else if (error != null && continuation.isActive)
continuation.resume(QueryResponse(null, error))
})
}
Run Code Online (Sandbox Code Playgroud)
为了处理错误,使用了try/catch模式。
存储库.kt
object ContentRepository {
fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
emit(Loading())
val labeledSet = HashSet<String>()
val user = usersDocument.collection(getInstance().currentUser!!.uid)
syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
getLoggedInNonRealtimeContent(timeframe, labeledSet, this)
}
// Realtime updates with 'awaitRealtime' used
private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
labeledSet: HashSet<String>, collection: String,
lce: FlowCollector<Lce<PagedListResult>>) {
val response = user.document(COLLECTIONS_DOCUMENT)
.collection(collection)
.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
.awaitRealtime()
if (response.error == null) {
val contentList = response.packet?.documentChanges?.map { doc ->
doc.document.toObject(Content::class.java).also { content ->
labeledSet.add(content.id)
}
}
database.contentDao().insertContentList(contentList)
} else lce.emit(Error(PagedListResult(null,
"Error retrieving user save_collection: ${response.error?.localizedMessage}")))
}
// One time updates with 'await' used
private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
labeledSet: HashSet<String>,
lce: FlowCollector<Lce<PagedListResult>>) =
try {
database.contentDao().insertContentList(
contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
.whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
.documentChanges
?.map { change -> change.document.toObject(Content::class.java) }
?.filter { content -> !labeledSet.contains(content.id) })
lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
} catch (error: FirebaseFirestoreException) {
lce.emit(Error(PagedListResult(
null,
CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6615 次 |
| 最近记录: |