标签: kotlin-flow

如果 Kotlin Flows 发出第一个值的时间过长,如何使其超时

我有一个可能永远不会被调用的监听器。但是,如果它至少被调用一次,我有理由确信它会被调用很多次。我是 Flows 的粉丝,因此我将其包装在callbackFlow() 构建器中。为了防止永远等待,我想添加一个超时时间。我正在尝试构建流运算符,如果流的第一个元素发出时间太长,它将抛出某种超时。这是我所拥有的。

fun <T> Flow<T>.flowBeforeTimeout(ms: Long): Flow<T> = flow {
    withTimeout(ms){
        emit(first())
    }
    emitAll(this@flowBeforeTimeout.drop(1))
}
Run Code Online (Sandbox Code Playgroud)

效果还不错,这些 JUnit4 测试都通过了。还有更多通过的测试,但为了简洁起见我省略了它们。

@Test(expected = CancellationException::class)
fun `Throws on timeout`(): Unit = runBlocking {
    val testFlow = flow {
        delay(200)
        emit(1)
    }

    testFlow.flowBeforeTimeout(100).toList()
}

Run Code Online (Sandbox Code Playgroud)
@Test
fun `No distortion`(): Unit = runBlocking {
    val testList = listOf(1,2,3)

    val resultList = testList
        .asFlow()
        .flowBeforeTimeout(100)
        .toList()

    assertThat(testList.size, `is`(resultList.size))
}
Run Code Online (Sandbox Code Playgroud)

然而,这个测试没有通过。

// Fails with: Expected: is <1> but: was <2>
@Test
fun `Starts only once`(): Unit = …
Run Code Online (Sandbox Code Playgroud)

kotlin kotlin-flow

8
推荐指数
1
解决办法
907
查看次数

Kotlin Flow 的 GroupBy 运算符

我正在尝试从 RxJava 切换到 Kotlin Flow。流量真的很震撼。但是现在在 kotlin Flow 中是否有类似于 RxJava 的“GroupBy”的运算符?

kotlin kotlin-flow

7
推荐指数
2
解决办法
1392
查看次数

Flow LifeCycle 是否识别为 LiveData?

我们知道LiveData 具有生命周期感知能力,如果配置发生更改,LiveData 对象不会每次都从数据库(本地/远程)重新查询,并且仅当数据有任何更新时才会更新。

最近我开始使用Kotlin Flow,我应该承认它最适合数据层,即在存储库中实现,以便通知 ViewModel。但我也在 ViewModel/View 层中使用了Kotlin Flow,以便直接根据其状态collect(密封类实现)来确定Fragment 中的 Flow 对象。我在使用 Flow 时遇到的问题是每次配置更改时都会从数据库(本地/远程)检索数据。

这种情况应该怎么办?有没有办法在使用 Flow 时避免重新查询,或者我应该只在 ViewModel/View 层中使用 LiveData?

示例代码

sealed class Status<T> {
class Processing<T> : Status<T>()
data class Completed<T>(val value: T) : Status<T>()
data class Error<T>(val error: String) : Status<T>()

companion object {
    fun <T> processing() = Processing<T>()
    fun <T> completed(value: T) = Completed(value)
    fun <T> error(error: String) = Error<T>(error)
    }
}
Run Code Online (Sandbox Code Playgroud)

回购协议:

class Repo(database: LocalDatabase){
     fun retrieveUsersData() …
Run Code Online (Sandbox Code Playgroud)

android kotlin android-livedata kotlin-flow

7
推荐指数
2
解决办法
5120
查看次数

kotlin flow onEach 没有被触发

我正在尝试使用 来存储价值DataStore


class BasicDataStore(context: Context) :
    PrefsDataStore(
        context,
        PREF_FILE_BASIC
    ),
    BasicImpl {

    override val serviceRunning: Flow<Boolean>
        get() = dataStore.data.map { preferences ->
            preferences[SERVICE_RUNNING_KEY] ?: false
        }

    override suspend fun setServiceRunningToStore(serviceRunning: Boolean) {
        dataStore.edit { preferences ->
            preferences[SERVICE_RUNNING_KEY] = serviceRunning
        }
    }

    companion object {
        private const val PREF_FILE_BASIC = "basic_preference"
        private val SERVICE_RUNNING_KEY = booleanPreferencesKey("service_running")
    }
}

@Singleton
interface BasicImpl {
    val serviceRunning: Flow<Boolean>
    suspend fun setServiceRunningToStore(serviceRunning: Boolean)
}
Run Code Online (Sandbox Code Playgroud)

在 a 中Service,尝试监视该值,以下是相应的代码:

private fun monitorNotificationService() { …
Run Code Online (Sandbox Code Playgroud)

datastore coroutine kotlin kotlin-coroutines kotlin-flow

7
推荐指数
1
解决办法
3995
查看次数

async{} 内部流程

我可以在 kotlin flow 中使用 async{} 吗?

场景:API 调用后,我得到了需要解析的 200 个对象的列表(转换为 UIObject)。我正在尝试并行处理这个列表。下面是伪代码:

 fun getUIObjectListFlow():Flow<List<UIObject>> {
    flow<List<UIObject>> {
        while (stream.hasNext()) {
            val data = stream.getData() // reading from an input stream. Data comes in chunk

            val firstHalfDeffered = async(Dispatchers.IO) { /* process first half of the list that data obj contains*/ }
            val secondHalfDeffered = async(Dispatchers.IO) { /*process second half of the list that data obj contains */ }
            val processedList = firstHalfDeffered.await() + secondHalfDeffered.await() // just pseudo code

            emit(processedList)
        } …
Run Code Online (Sandbox Code Playgroud)

android kotlin kotlin-coroutines kotlin-flow

7
推荐指数
1
解决办法
7741
查看次数

如何在 Jetpack Compose 中使用 SharedFlow

通过状态流我可以使用

val items by myViewModel.items.collectAsState()
Run Code Online (Sandbox Code Playgroud)

我想共享流不能以这种方式使用。共享流是否适用于 Compose?

android kotlin-coroutines android-jetpack-compose kotlin-flow

7
推荐指数
2
解决办法
7698
查看次数

SharedFlow 不从发射中收集

在我的 ViewModel 中,我正在发出 API 请求,并且正在使用 FragmentStateFlowSharedFlow与 Fragment 进行通信。在发出 API 请求时,我可以轻松更新状态流的值,并将其成功收集到片段中。

但在发出请求之前,我发出了一些布尔值,SharedFlow并且它没有收集在片段中。有人可以帮助我为什么会发生这种情况吗?

class MainViewModel: ViewModel() {
  private val _stateFlow = MutableStateFlow(emptyList<Model>())
  val stateFlow = _stateFlow.asStateFlow()

  private val _loading = MutableSharedFlow<Boolean>()
  val loading = _loading.asSharedFlow()

  suspend fun request() {
    _loading.emit(true)
    withContext(Dispatchers.IO) {
      /* makes API request */
      /* updates _stateFlow.value */
      /* stateFlow value is successfully collected */
    }
    _loading.emit(false) // emitting boolean value
  }
}
Run Code Online (Sandbox Code Playgroud)
class MyFragment : Fragment(R.layout.fragment_my) {
   // ...

  override …
Run Code Online (Sandbox Code Playgroud)

android kotlin kotlin-coroutines kotlin-flow kotlin-sharedflow

7
推荐指数
2
解决办法
8545
查看次数

Kotlin 流程中的 single() 和 first() 有什么区别?

Kotlin 流程中的 single() 和 first() 有什么区别?您能举例说明何时使用哪种变体吗?

kotlin kotlin-flow

7
推荐指数
1
解决办法
3492
查看次数

难道collectAsStateWithLifecycle只适用于冷流,对热流没有帮助(例如stateFlow)?

最近,自 android 生命周期库版本 2.6.0-alpha01 以来,我们有了一个新的 API,即

\n
collectAsStateWithLifecycle(...)\n
Run Code Online (Sandbox Code Playgroud)\n

这是 Google Developer 在这篇文章中提倡的

\n
\n

如果您\xe2\x80\x99 使用 Jetpack Compose 构建 Android 应用程序,请使用\ncollectAsStateWithLifecycle可组合函数(而不是\n collectAsState

\n
\n

我尝试一下,对于流动(冷流)例如

\n
    val counter = flow {\n        var value = 0\n        while (true) {\n           emit(value++)\n           delay(1000)\n        }\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

拥有它是有用的

\n
flow.collectAsStateWithLifecycle(0)\n
Run Code Online (Sandbox Code Playgroud)\n

但是如果我们有一个像 mutableStateFlow 这样的热流

\n
val stateFlow = MutableStateFlow(0)\n
Run Code Online (Sandbox Code Playgroud)\n

拥有似乎没什么用

\n
stateFlow.collectAsStateWithLifecycle(0)\n
Run Code Online (Sandbox Code Playgroud)\n

鉴于它不会阻止任何排放。

\n

我说collectAsStateWithLifecycle只对冷流有用,对热流无效,这样说对吗?

\n

如果我错了,你能给我举一个collectAsStateWithLifecycle对热流也有用的例子吗?

\n

android android-lifecycle kotlin kotlin-flow kotlin-stateflow

7
推荐指数
1
解决办法
5739
查看次数

何时在 kotlin 中一起使用挂起函数和 Flow 或单独使用?

在审查用 kotlin 编写的一些代码时,有件事引起了我的注意。我在一些项目中查看领域层,在一些项目中,我看到挂起功能和 Flow 一起使用,而在一些项目中,我看到只使用 Flow。

例如暂停和流动在一起

class FetchMovieDetailFlowUseCase @Inject constructor(
    private val repository: MovieRepository
) : FlowUseCase<FetchMovieDetailFlowUseCase.Params, State<MovieDetailUiModel>>() {

    data class Params(val id: Long)

    override suspend fun execute(params: Params): Flow<State<MovieDetailUiModel>> =
        repository.fetchMovieDetailFlow(params.id)
}
Run Code Online (Sandbox Code Playgroud)

只是流动

class GetCoinUseCase @Inject constructor(
    private val repository: CoinRepository
){
 
    operator fun invoke(coinId:String): Flow<Resource<CoinDetail>> = flow {

        try {
            emit(Resource.Loading())
            emit(Resource.Success(coin))

        }catch (e:HttpException){
            emit(Resource.Error(e.localizedMessage ?: "An unexpected error occured"))
        }catch (e:IOException){
            emit(Resource.Error("Couldn't reach server. Check your internet connection."))
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

只是暂停

class GetLatestNewsWithAuthorsUseCase( …
Run Code Online (Sandbox Code Playgroud)

suspend kotlin kotlin-flow

7
推荐指数
1
解决办法
2472
查看次数