标签: kotlin-flow

为什么 ConflatedBroadcacstChannel.asFlow() 没有调用 Flow.onCompletion?

Coroutines+Flow/LiveData 的新 Android 代码实验室中,您更新了 LiveData api 以使用 Flow,但我注意到一些onCompletion未按预期调用的意外行为。我们要替换的 LiveData 代码:

\n\n
viewModelScope.launch {\n  try {\n    _spinner.value = true\n  ...query some data...\n  } catch (error: Throwable) {\n    _snackbar.value = error.message\n  } finally {\n    _spinner.value = false\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我们在代码实验室结束时得到的流程代码:

\n\n
val growZoneChannel = ConflatedBroadcastChannel<Int>()\n...\ngrowZoneChannel.asFlow()\n       .mapLatest { growZone ->\n           _spinner.value = true\n           ...query some data...\n       }\n       .onCompletion {  _spinner.value = false }\n       .catch { throwable ->  _snackbar.value = throwable.message  }\n       .launchIn(viewModelScope)\n
Run Code Online (Sandbox Code Playgroud)\n\n

当您运行 Flow 时,微调器永远不会消失,因为即使mapLatest转换完成,也永远不会调用 onCompletion。代码实验室声明,“ …

android kotlin kotlin-coroutines kotlin-flow

5
推荐指数
0
解决办法
2187
查看次数

具有 Room 和状态处理功能的 Kotlin 协程流

我正在尝试新的协程流程,我的目标是创建一个简单的存储库,可以从 Web api 获取数据并将其保存到数据库,还可以从数据库返回流程。

我使用 room 和 firebase 作为 Web api,现在一切看起来都非常简单,直到我尝试将来自 api 的错误传递到 ui。

由于我从数据库获取的流仅包含数据而没有状态,因此通过将其与 Web api 结果相结合来为其提供状态(如加载、内容、错误)的正确方法是什么?

我写的一些代码:

DAO:

@Query("SELECT * FROM users")
fun getUsers(): Flow<List<UserPojo>>
Run Code Online (Sandbox Code Playgroud)

存储库:

val users: Flow<List<UserPojo>> = userDao.getUsers()
Run Code Online (Sandbox Code Playgroud)

API 调用:

override fun downloadUsers(filters: UserListFilters, onResult: (result: FailableWrapper<MutableList<UserApiPojo>>) -> Unit) {
    val data = Gson().toJson(filters)

    functions.getHttpsCallable("users").call(data).addOnSuccessListener {
        try {
            val type = object : TypeToken<List<UserApiPojo>>() {}.type
            val users = Gson().fromJson<List<UserApiPojo>>(it.data.toString(), type)
            onResult.invoke(FailableWrapper(users.toMutableList(), null))
        } catch (e: java.lang.Exception) {
            onResult.invoke(FailableWrapper(null, "Error parsing data"))
        }
    }.addOnFailureListener {
        onResult(FailableWrapper(null, …
Run Code Online (Sandbox Code Playgroud)

android firebase android-room kotlin-coroutines kotlin-flow

5
推荐指数
1
解决办法
8405
查看次数

Kotlin 流程:仅收集重复至少 N 次的元素

我正在实现 ML Kit OCR 功能,有时,前几个值是错误的,只有在一段时间后相机才会稳定并产生正确的值。我不想删除前 X 值,因为我不知道该流将包含多少个元素。因此,最好的方法是使用某种条件,将当前元素与前一个元素进行比较,但不确定。

Kotlin Flow API 中是否有一个函数可以比较收集的值并仅收集至少出现 N 次的值?

  private val _detectedValues = ConflatedBroadcastChannel<String>()
  val detectedFlow = _detectedValues
      .asFlow()
      .map { it.replace(" ", "") }
      .filter { it.checkRegex() }
      .onEach {
          Log.i(TAG, "detected: $it")
      }
Run Code Online (Sandbox Code Playgroud)

android kotlin firebase kotlin-coroutines kotlin-flow

5
推荐指数
1
解决办法
8138
查看次数

调用 flow.single() 时何时抛出 NoSuchElementException

假设我有一个像这样的 API:

interface Foo {
   val barFlow: Flow<Bar>
}
Run Code Online (Sandbox Code Playgroud)

我像这样消费它:

class FooConsumer(private val foo: Foo) {
   init {
       CoroutineScope(Dispatchers.IO).launch {
           val bar = foo.barFlow.single()
           println("Collected bar: $bar)
       }
   }
}
Run Code Online (Sandbox Code Playgroud)

根据文档,如果流为空,则可以抛出singlea 。NoSuchElementException然而,这让我很困惑,因为流上的终端操作将“等待”流的元素被发出。那么调用如何single知道流中没有元素呢?也许某个元素还没有被发射?

我的意思是,在幕后,调用single是在进行检查之前收集源流。因此,在执行 null 检查之前,必须至少发出 1 项,以便 null 检查永远不会成功,并且NoSuchElementException永远不会抛出 a (对于流属于不可为 null 类型的情况)。

那么NoSuchElementException只有可空类型的流才有可能吗?

这是源代码single

/**
 * The terminal operator, that awaits for one and only one value to be published.
 * Throws [NoSuchElementException] for empty …
Run Code Online (Sandbox Code Playgroud)

kotlin kotlin-coroutines kotlin-flow

5
推荐指数
1
解决办法
3890
查看次数

流和通道流有什么区别?

我有下面的两个代码,结果对我来说看起来是一样的

val namesFlow = flow {
    println("Start flow")
    (0..10).forEach {
        // Emit items with 500 milliseconds delay
        delay(500)
        println("Emitting $it")
        emit(it)
    }
}.map { it * it }

fun main() = runBlocking {
    namesFlow.collect { println(it) }
    namesFlow.collect { println(it) }
    println("Finish Flow")
}

Run Code Online (Sandbox Code Playgroud)

val namesFlow = channelFlow {
    println("Start flow")
    (0..10).forEach {
        // Emit items with 500 milliseconds delay
        delay(500)
        println("Emitting $it")
        send(it)
    }
}.map { it * it }

fun main() = runBlocking {
    namesFlow.collect { println(it) …
Run Code Online (Sandbox Code Playgroud)

kotlin kotlin-coroutines kotlin-flow

5
推荐指数
1
解决办法
4271
查看次数

使用无限流测试 Kotlin Flow.combine

我有以下测试

@Test
fun combineUnendingFlows() = runBlockingTest {

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1, 2, 3)

    val combinations = mutableListOf<String>()

    combine(source1, source2) { first, second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(this)

    advanceUntilIdle()

    assertThat(combinations).containsExactly("a1", "b1", "b2", "b3")
}
Run Code Online (Sandbox Code Playgroud)

断言成功,但测试失败,但出现以下异常:

kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs
Run Code Online (Sandbox Code Playgroud)

我知道这是人为的,我们可以通过确保完成来轻松完成此传递source1,但我想知道为什么它会失败?runBlockingTest测试永无止境的流程的方法是否错误?

(这是与协程一起使用的1.4.0)。

kotlin kotlin-coroutines kotlin-flow

5
推荐指数
1
解决办法
2478
查看次数

Kotlin 中带有新参数的 Paging 3.0 列表

我有以下代码:

\n
   val history: Flow<PagingData<Any>> = Pager(PagingConfig(pageSize = 10)) {\n    PaginationBaseDataSource(apiService)\n}.flow\n    .cachedIn(viewModelScope)\n
Run Code Online (Sandbox Code Playgroud)\n

当前正在显示没有任何附加参数的项目列表。这工作正常...但现在我希望根据用户可以在前端更改的某些参数来查询此列表,让\xc2\xb4s 说我希望添加参数 3 作为查询。

\n
   val history: Flow<PagingData<Any>> = Pager(PagingConfig(pageSize = 10)) {\n    PaginationBaseDataSource(apiService, 3)\n}.flow\n    .cachedIn(viewModelScope)\n
Run Code Online (Sandbox Code Playgroud)\n

问题是...我怎样才能动态设置这个查询参数?让\xc2\xb4s 说用户而不是3,使用6,然后是9。我怎样才能实现这一点?

\n

非常感谢

\n

paging pagination android kotlin kotlin-flow

5
推荐指数
1
解决办法
1344
查看次数

LiveData 转换为 StateFlow / SharedFlow

StateFlow / SharedFlow 中此实时数据转换的等效代码是什么?

val myLiveData: LiveData<MyLiveData> = Transformations
                    .switchMap(_query) {
                        if (it == null) {
                           AbsentLiveData.create()
                        } else {
                           repository.load()
                     }
Run Code Online (Sandbox Code Playgroud)

基本上,我想监听每个查询更改以对返回的内容做出反应。因此,任何类似于使用 StateFlow / SharedFlow 的东西都是受欢迎的。

kotlin android-livedata kotlin-flow kotlin-stateflow kotlin-sharedflow

5
推荐指数
1
解决办法
2601
查看次数

android协程流程手动重试

我在 viewModel 中使用 stateFlow 来获取带有密封类的 api 调用的结果,如下所示:

sealed class Resource<out T> {
    data class Loading<T>(val data: T?): Resource<T>()
    data class Success<T>(val data: T?): Resource<T>()
    data class Error<T>(val error: Exception, val data: T?, val time: Long = System.currentTimeMillis()): Resource<Nothing>()
}
Run Code Online (Sandbox Code Playgroud)
class VehicleViewModel @ViewModelInject constructor(application: Application, private val vehicleRepository: VehicleRepository): BaseViewModel(application) {

    val vehiclesResource: StateFlow<Resource<List<Vehicle>>> = vehicleRepository.getVehicles().shareIn(viewModelScope, SharingStarted.Eagerly, replay = 1)
}
Run Code Online (Sandbox Code Playgroud)

我想在我的 UI 中建议一个按钮,以便如果 api 调用失败,用户可以重试调用。我知道流程上有一个retry方法,但无法手动调用它,因为它仅在发生异常时触发。

一个常见的用例是:用户没有互联网连接,当 api 调用返回网络异常时,我向用户显示一条消息,告诉他检查其连接,然后使用重试按钮(或通过检测设备现在已连接,无论如何),我重试该流程。

但我想不出一种方法来做到这一点,就像你不能那样,比如 call flow.retry()retry一旦发生异常,就会调用实际的方法。我不想在不要求用户检查其连接的情况下立即重试,这没有意义。

实际上,我发现的唯一解决方案是在按下重试按钮时重新创建活动,以便重置流程,但这当然对性能来说很糟糕。

如果没有流程,解决方案很简单,并且有大量示例,您只需重新启动作业即可,但我找不到一种方法来正确使用流程。我的存储库中有本地房间数据库和远程服务之间的逻辑,并且 flow api …

android kotlin-coroutines kotlin-flow

5
推荐指数
1
解决办法
2238
查看次数

Android - 如何让recyclerview ListAdpater停止闪烁?

我有一个回收器视图,其适配器使用 ListAdapter (版本 1.1.0):

class InnerEpisodeFragmentAdapter(
    private val actCtx: Context,
) : ListAdapter<Episode, InnerEpisodeFragmentAdapter.MViewHolder>(COMPARATOR) {
...
Run Code Online (Sandbox Code Playgroud)

回收器视图由来自 Room 数据库 Episode 表的 kotlin 流提供:

vm.episodesFlow().asLiveData().observe(viewLifecycleOwner) { episodes ->
  episodes.let { adapter.submitList(it) }
}

@Query("SELECT * FROM Episode ORDER BY pubDate DESC")
fun episodesFlow(): Flow<List<Episode>>
Run Code Online (Sandbox Code Playgroud)

例外的是,每次剧集表中的元组发生更改时,都会发出新的剧集列表并更新回收器视图。

它工作正常,但每次更新时都会出现可怕的闪烁。它给用户带来了糟糕的体验。

当流发出新值时如何避免这种闪烁?

在我的应用程序的先前版本中,我使用了像notifyDataSetChanged()或notifyItemChanged()这样从不闪烁的函数。我知道我仍然可以尝试使用这些函数,但如果我在使用kotlin时无法避免闪烁,我会非常失望流程如上图。谢谢。

android android-recyclerview android-listadapter kotlin-flow

5
推荐指数
1
解决办法
4096
查看次数