相当于 Kotlin 协程流程中的 RxJava .toList()

Car*_*mer 8 kotlin rx-java kotlin-coroutines kotlin-flow

我遇到一种情况,我需要观察 userId,然后使用这些 userId 来观察用户。userIds 或用户可能随时更改,我希望使发出的用户保持最新状态。这是我拥有的数据源的示例:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我希望排放量为:

[User(abc_name), User(def_name)], 然后

[User(123_name), User(234_name)], 然后

[User(123_name_updated), User(234_name_updated)]

我想我可以在 RxJava 中实现这一点,如下所示:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}
Run Code Online (Sandbox Code Playgroud)

我应该编写什么函数来生成发出该信号的流?

Rya*_*ley 6

我相信您正在寻找combine,它为您提供了一个可以轻松调用的数组toList()

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.collect {
        println(it)
    } 
}
Run Code Online (Sandbox Code Playgroud)

这是具有更明确参数名称的内部部分,因为您在 Stack Overflow 上看不到 IDE 的类型提示:

combine(
    ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
    arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
    println(listOfUsers)
}
Run Code Online (Sandbox Code Playgroud)

输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
Run Code Online (Sandbox Code Playgroud)

现场演示(请注意,在演示中,所有输出都会同时出现,但这是演示站点的限制 - 这些行的出现时间与代码在本地运行时预期的时间相同)

这避免了原始问题中讨论的( abc_name_updated, ) 。def_name_updated但是,仍然存在 和 的中间发射,123_name_updated因为234_name123_name_updated首先发射的,并且它立即发送组合版本,因为它们是每个流中的最新版本。

然而,这可以通过消除发射来避免(在我的机器上,小至 1 毫秒的超时就可以了,但为了保守起见,我选择了 20 毫秒):

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
Run Code Online (Sandbox Code Playgroud)

这会让你得到你想要的确切输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
Run Code Online (Sandbox Code Playgroud)

现场演示