与 LiveData 类似,如何在协程之外获取 Flow 的值?
// Suspend function 'first' should be called only from a coroutine or another suspend function
flowOf(1).first()
Run Code Online (Sandbox Code Playgroud)
// value is null
flowOf(1).asLiveData().value
Run Code Online (Sandbox Code Playgroud)
// works
MutableLiveData(1).value
Run Code Online (Sandbox Code Playgroud)
语境
我避免LiveData在存储库层中使用Flow. 然而,我需要设置、观察和收集立即消费的价值。后者对于 OkHttp3 中的身份验证很有用Interceptor。
我试图在我的应用程序中使用 Flow 将我的 Paging 2 实现转换为 Paging 3,但受到“Channel was closed”异常的困扰。该应用程序运行良好,但在一段时间后崩溃(从几秒钟到几分钟)。它从网络加载数据,这可能很慢。使用 Room 作为内部数据库。模拟器上的崩溃比真实设备上的要频繁得多。
我不熟悉 Flow,我应该注意 Flow 系统中是否有任何已知的罪魁祸首?
有没有其他人在使用 androidx 的 Paging 3 时遇到过类似的情况?
因此,如果这是一个已知问题(可能有明显的修复方法),请给我一个提示;与否,我可能会发布 Paging 3 代码部分(这将需要一些时间)。
我的异常如下所示:
2020-06-29 09:23:59.929 28295-28295/no.rogo.emptyfuel E/AndroidRuntime: FATAL EXCEPTION: main
Process: no.rogo.emptyfuel, PID: 28295
kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
at kotlinx.coroutines.channels.Closed.getSendException(AbstractChannel.kt:1035)
at kotlinx.coroutines.channels.AbstractSendChannel.helpCloseAndResumeWithSendException(AbstractChannel.kt:206)
at kotlinx.coroutines.channels.AbstractSendChannel.access$helpCloseAndResumeWithSendException(AbstractChannel.kt:19)
at kotlinx.coroutines.channels.AbstractSendChannel.sendSuspend(AbstractChannel.kt:196)
at kotlinx.coroutines.channels.AbstractSendChannel.send(AbstractChannel.kt:135)
at kotlinx.coroutines.channels.ChannelCoroutine.send$suspendImpl(Unknown Source:2)
at kotlinx.coroutines.channels.ChannelCoroutine.send(Unknown Source:0)
at androidx.paging.PageFetcherSnapshot$pageEventFlow$1$2$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:58)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Unknown Source:11)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:184)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:108)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:308)
at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:395)
at …Run Code Online (Sandbox Code Playgroud) 我想为 Kotlin Flows 的永久循环和发出结果制定一个很好的逻辑。用例是,每 n 分钟我需要更新应用程序中的配置,并且此配置来自其余 api。
我认为一个不错的解决方案是运行一个“调度程序”,在后台每 n 分钟轮询一次 api,并且ConfigService订阅该调度程序的调度程序可以在调度程序发出新值时更新它自己的状态。
使用 RxJava 这将是
Observable.interval(n, TimeUnit.MINUTES)
.flatMap( ... )
Run Code Online (Sandbox Code Playgroud)
但由于我使用 Kotlin,我认为我可以使用原生 Flow 库实现相同的逻辑。那会是什么样子?我试图用谷歌搜索,要么没有找到正确的关键字,要么之前没有人遇到过同样的问题?
我正在使用 datastore-preferences:1.0.0-alpha01 并且当数据存储值更新时我似乎无法恢复事件。我一直试图在片段和父活动中观察它,结果相同。当我创建 DataStore 的实例并观察值流时,我在声明观察者后立即收到一个事件(注册观察者后似乎很常见)。这将表明流正在运行,并且观察者正在按预期接收事件。然后我在同一个观察者内部更新首选项的值,该观察者从不接收事件。
我使用这篇文章作为参考https://medium.com/scalereal/hello-datastore-bye-sharedpreferences-android-f46c610b81d5,它与我用来比较我的实现的 repo 一起使用。显然,这个是有效的。https://github.com/PatilShreyas/DataStoreExample
数据存储实用程序
class DataStoreUtils(context: Context) {
companion object {
private const val TAG = "DataStoreUtils"
}
private val dataStore = context.createDataStore(name = Constants.PrefName.APP_PREFS)
suspend fun setString(prefKey: String, value: String) {
Log.d(TAG, "Setting $prefKey to $value")
dataStore.edit { it[preferencesKey<String>(prefKey)] = value }
}
suspend fun getString(prefKey: String): String? {
return dataStore.data.map { it[preferencesKey<String>(prefKey)] ?: return@map null }.first()
}
val usernameFlow: Flow<String?> = dataStore.data
.catch {
if (it is IOException) {
it.printStackTrace() …Run Code Online (Sandbox Code Playgroud) android android-lifecycle kotlin-flow android-jetpack-datastore
我正在实现 ML Kit OCR 功能,有时,前几个值是错误的,只有在一段时间后相机才会稳定并产生正确的值。我不想删除前 X 值,因为我不知道该流将包含多少个元素。因此,最好的方法是使用某种条件,将当前元素与前一个元素进行比较,但不确定。
Kotlin Flow API 中是否有一个函数可以比较收集的值并仅收集至少出现 N 次的值?
private val _detectedValues = ConflatedBroadcastChannel<String>()
val detectedFlow = _detectedValues
.asFlow()
.map { it.replace(" ", "") }
.filter { it.checkRegex() }
.onEach {
Log.i(TAG, "detected: $it")
}
Run Code Online (Sandbox Code Playgroud) 假设我有一个像这样的 API:
interface Foo {
val barFlow: Flow<Bar>
}
Run Code Online (Sandbox Code Playgroud)
我像这样消费它:
class FooConsumer(private val foo: Foo) {
init {
CoroutineScope(Dispatchers.IO).launch {
val bar = foo.barFlow.single()
println("Collected bar: $bar)
}
}
}
Run Code Online (Sandbox Code Playgroud)
根据文档,如果流为空,则可以抛出singlea 。NoSuchElementException然而,这让我很困惑,因为流上的终端操作将“等待”流的元素被发出。那么调用如何single知道流中没有元素呢?也许某个元素还没有被发射?
我的意思是,在幕后,调用single是在进行检查之前收集源流。因此,在执行 null 检查之前,必须至少发出 1 项,以便 null 检查永远不会成功,并且NoSuchElementException永远不会抛出 a (对于流属于不可为 null 类型的情况)。
那么NoSuchElementException只有可空类型的流才有可能吗?
这是源代码single:
/**
* The terminal operator, that awaits for one and only one value to be published.
* Throws [NoSuchElementException] for empty …Run Code Online (Sandbox Code Playgroud) 我有下面的两个代码,结果对我来说看起来是一样的
val namesFlow = flow {
println("Start flow")
(0..10).forEach {
// Emit items with 500 milliseconds delay
delay(500)
println("Emitting $it")
emit(it)
}
}.map { it * it }
fun main() = runBlocking {
namesFlow.collect { println(it) }
namesFlow.collect { println(it) }
println("Finish Flow")
}
Run Code Online (Sandbox Code Playgroud)
和
val namesFlow = channelFlow {
println("Start flow")
(0..10).forEach {
// Emit items with 500 milliseconds delay
delay(500)
println("Emitting $it")
send(it)
}
}.map { it * it }
fun main() = runBlocking {
namesFlow.collect { println(it) …Run Code Online (Sandbox Code Playgroud) 我有以下测试
@Test
fun combineUnendingFlows() = runBlockingTest {
val source1 = flow {
emit("a")
emit("b")
suspendCancellableCoroutine { /* Never complete. */ }
}
val source2 = flowOf(1, 2, 3)
val combinations = mutableListOf<String>()
combine(source1, source2) { first, second -> "$first$second" }
.onEach(::println)
.onEach(combinations::add)
.launchIn(this)
advanceUntilIdle()
assertThat(combinations).containsExactly("a1", "b1", "b2", "b3")
}
Run Code Online (Sandbox Code Playgroud)
断言成功,但测试失败,但出现以下异常:
kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs
Run Code Online (Sandbox Code Playgroud)
我知道这是人为的,我们可以通过确保完成来轻松完成此传递source1,但我想知道为什么它会失败?runBlockingTest测试永无止境的流程的方法是否错误?
(这是与协程一起使用的1.4.0)。
我有以下代码:
\n val history: Flow<PagingData<Any>> = Pager(PagingConfig(pageSize = 10)) {\n PaginationBaseDataSource(apiService)\n}.flow\n .cachedIn(viewModelScope)\nRun Code Online (Sandbox Code Playgroud)\n当前正在显示没有任何附加参数的项目列表。这工作正常...但现在我希望根据用户可以在前端更改的某些参数来查询此列表,让\xc2\xb4s 说我希望添加参数 3 作为查询。
\n val history: Flow<PagingData<Any>> = Pager(PagingConfig(pageSize = 10)) {\n PaginationBaseDataSource(apiService, 3)\n}.flow\n .cachedIn(viewModelScope)\nRun Code Online (Sandbox Code Playgroud)\n问题是...我怎样才能动态设置这个查询参数?让\xc2\xb4s 说用户而不是3,使用6,然后是9。我怎样才能实现这一点?
\n非常感谢
\nStateFlow / SharedFlow 中此实时数据转换的等效代码是什么?
val myLiveData: LiveData<MyLiveData> = Transformations
.switchMap(_query) {
if (it == null) {
AbsentLiveData.create()
} else {
repository.load()
}
Run Code Online (Sandbox Code Playgroud)
基本上,我想监听每个查询更改以对返回的内容做出反应。因此,任何类似于使用 StateFlow / SharedFlow 的东西都是受欢迎的。
kotlin android-livedata kotlin-flow kotlin-stateflow kotlin-sharedflow
kotlin-flow ×10
kotlin ×8
android ×5
androidx ×1
coroutine ×1
firebase ×1
pagination ×1
paging ×1