Kotlin Flow 并行执行两个 API 调用,并在每个结果到达时收集它们

Abh*_*sal 9 android reactive-programming kotlin kotlin-coroutines kotlin-flow

我正在尝试使用Kotlin Flows. 这是我现在正在尝试的

flowOf(
 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
   .catch { error -> Timber.e(error) },
 remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
  Timber.i("Response Received")
}
Run Code Online (Sandbox Code Playgroud)

这里的问题collect是只在getDataFromServer返回时调用。我的期望是我应该在几毫秒后从缓存中获取第一个事件,然后从服务器获取第二个事件。在这种情况下,"Response Received"会打印两次,但会立即一个接一个地打印。

在此其他变体中,"Response Received"仅在getDataFromServer()返回后打印一次。

 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
  .catch { error -> Timber.e(error) }
  .flatMapConcat {
    remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
  }
  .collect {
    Timber.i("Response Received")
  }
Run Code Online (Sandbox Code Playgroud)

Flowable.concat()之前使用过 RxJava ,它运行良好。Kotlin Flows 中是否有可以模拟这种行为的东西?

Mar*_*nik 8

这里的问题是collect仅在getDataFromServer返回时调用。

你的设计中的第一个问题是,Flow-returning 功能也是可暂停的。这是两层可悬浮性。函数应该毫无延迟地返回流,并且流本身应该在传入时发出项目。如果您遵循此准则,您的初始代码已经可以工作了。

按照你编写这些函数的方式,如果你这样写,它们仍然可以工作:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}
Run Code Online (Sandbox Code Playgroud)

此语句立即结束,返回冷流。当您调用collect它时,它首先调用getCached()并发出缓存的值,然后调用getFromServer()并发出服务器响应。


上述解决方案仅在您使用缓存值后才启动服务器调用。如果您需要两个流同时处于活动状态,请使用flatMapMerge

假设您解决了上述基本问题并使您的 Flow-returning 函数不暂停,您所需要的就是:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}
Run Code Online (Sandbox Code Playgroud)

如果由于某种原因你不能这样做,你必须emitAll在每个调用周围添加包装器:

flowOf(getCached(), getFromServer()).flattenMerge()
Run Code Online (Sandbox Code Playgroud)


Abh*_*sal 2

事实证明,如果flowOf(someOperation()) someOperation()需要完成下游才能开始处理。就像Observable.just(someOperation())RxJava世界上一样。

在第二种情况下flatMapConcat实际上是一个transform运算符,因此它显然返回最终处理的输出。

Flow世界里似乎缺乏concat像原生的算子。这就是我最终解决这个问题的方法

flow {
   remoteDataSource.getDataFromCache()
   .catch { error -> Timber.e(error) }
   .onCompletion {
       remoteDataSource.getDataFromServer()
            .collect {
                 emit(it)
            }
    }.collect { emit(it) }
}
Run Code Online (Sandbox Code Playgroud)