在 RxJava 中,有一个valve运算符允许暂停(和缓冲)流并再次恢复流(并且在恢复后立即发出缓冲的值)。它是 rx java 扩展的一部分(https://github.com/akarnokd/RxJavaExtensions/blob/3.x/src/main/java/hu/akarnokd/rxjava3/operators/FlowableValve.java)。
kotlin 流程有类似的东西吗?
我的用例是我想观察活动内的流程并且永远不会丢失事件(就像我会用LiveDataeg 做的那样,如果活动暂停,它将停止观察数据)。因此,当活动暂停时,我希望流程缓冲观察到的值,直到活动恢复为止,并在活动恢复后立即将它们全部发出。
因此,当活动被创建时(直到它被销毁),我想观察流程,但我只想在活动处于活动状态时发出值,并在活动不活动(但仍然创建)时缓冲值,直到它再次活动。
有什么办法可以解决这个问题或者有人写过什么东西来解决这个问题吗?
我正在学习 kotlin 协程和流程,有一件事对我来说有点晦涩难懂。如果我的常规协程有一个很长的运行循环,我可以使用 isActive 或 EnsureActive 来处理取消。然而,这些并不是为流程定义的,但以下代码正确地完成了流程:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
private val logger = LoggerFactory.getLogger("Main")
fun main() {
val producer = FlowProducer()
runBlocking {
producer
.produce()
.take(10)
.collect {
logger.info("Received $it")
}
}
logger.info("done")
}
class FlowProducer {
fun produce() = flow {
try {
var counter = 1
while (true) {
logger.info("Before emit")
emit(counter++)
logger.info("After emit")
}
}finally {
logger.info("Producer has finished")
}
}.flowOn(Dispatchers.IO)
}
Run Code Online (Sandbox Code Playgroud)
为什么会出现这样的情况呢?是因为emit是一个可挂起的函数来为我处理取消吗?如果有条件地调用发射该怎么办?例如,该循环实际上轮询来自 Kafka 的记录,并且仅当接收到的记录不为空时才会发出调用。那么我们可以有这样的情况:
我刚开始使用Flows. 我遇到的情况是,我需要在调用之前等待userLoginStatusChangedFlow收集readProfileFromFirestore()(这也会收集Flow)。第一个检查用户是否已登录 Firebase Auth,而第二个则从 Firestore 下载用户的个人资料信息。我的代码可以工作,但我不确定我是否按照预期的方式进行操作。
Flows问:这样“连锁”是标准做法吗?你会采取不同的做法吗?
init {
viewModelScope.launch {
repository.userLoginStatusChangedFlow.collect { userLoggedIn: Boolean? ->
if (userLoggedIn == true) {
launch {
readProfileFromFirestore()
}
} else {
navigateToLoginFragment()
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
readProfileFromFirestore()上面调用的方法:
// Download profile from Firestore and update the repository's cached profile.
private suspend fun readProfileFromFirestore() {
repository.readProfileFromFirestoreFlow().collect { state ->
when (state) {
is State.Success -> {
val profile: Models.Profile? = state.data
if (profile != null …Run Code Online (Sandbox Code Playgroud) 我正在进行 2 个并行 API 调用来请求分页数据。我需要在获取结果后合并两个 API 调用的结果并将分页数据提交给适配器。
viewLifecycleOwner.lifecycleScope.launchWhenStarted {
val assignedList = async {
assignedTaskViewModel.getPagingTasks(getAssignedTasks)
}
val unscheduledTasksList = async {
assignedTaskViewModel.getUnscheduledChores(getUnscheduledTasks)
}
unscheduledTasksList.await().combine(assignedList.await()) { unschedule, assign ->
Timber.d("combining 2 calls")
//TODO need to show unscheduledTasksList first followed by assignedList
}.collectLatest {
it.map { value ->
Timber.d("paging data = $value")
}
assignedTasksPagingAdapter?.submitData(it)
}
}
Run Code Online (Sandbox Code Playgroud)
有没有办法转换 pagingData,以便我可以合并 2 个 API 调用的结果并将其作为单个分页数据提交给适配器?
嘿,我是 kotlin flow的新手。我正在尝试打印流量大小。众所周知,列表具有size()功能。我们有类似的flow函数吗?
val list = mutableListof(1,2,3)
println(list.size)
Run Code Online (Sandbox Code Playgroud)
输出
2
Run Code Online (Sandbox Code Playgroud)
我们如何获得流量中的大小值?
dataMutableStateFlow.collectLatest { data ->
???
}
Run Code Online (Sandbox Code Playgroud)
谢谢
我正在编写一个解码器,它将接收一系列字节缓冲区并将内容解码为单个String. 可以有任意数量的字节缓冲区,每个缓冲区包含任意数量的字节。缓冲区不一定在字符边界上分割,因此根据编码,它们可能在开头或结尾包含部分字符。这就是我想要做的,StringByteStreamDecoder我需要编写的新类在哪里。
suspend fun decode(data: Flow<ByteBuffer>, charset: Charset): String {
val decoder = StringByteStreamDecoder(charset)
data.collect { bytes ->
decoder.feed(bytes)
}
decoder.endOfInput()
return decoder.toString()
}
Run Code Online (Sandbox Code Playgroud)
最简单的方法是将所有字节缓冲区收集到一个字节数组中。我拒绝了这种方法,因为它具有显着的内存开销。它需要为完整消息分配空间至少两次:一次为原始字节,一次为解码字符。这是我的简单实现,使用 aByteArrayOutputStream作为扩展字节缓冲区。
class StringByteStreamDecoder(private val charset: Charset) {
private val buffer = ByteArrayOutputStream()
fun feed(data: ByteBuffer) {
if (data.hasArray()) {
buffer.write(data.array(), data.position() + data.arrayOffset(), data.remaining())
} else {
val array = ByteArray(data.remaining())
data.get(array)
buffer.write(array, 0, array.size)
}
}
fun endOfInput() {
buffer.flush()
}
override fun toString(): String {
return …Run Code Online (Sandbox Code Playgroud) 我的视图模型
val username = MutableStateFlow("")
val password = MutableStateFlow("")
val state = combineFlows(
username,
password,
) { username, password->
LoginViewState(
username = username,
password = password,
)
}.stateIn(viewModelScope, WhileSubcribed(5000), initial = LoginViewState.Initial)
Run Code Online (Sandbox Code Playgroud)
我可以测试用户名变量是否更新或不使用涡轮机,如下所示:
@Test
fun testIfCredentialsUpdate() = runTest {
viewModel.onUsernameChange("usr")
viewModel.username.test {
assertThat(awaitItem()).isEqualTo("usr")
cancel()
}
}
Run Code Online (Sandbox Code Playgroud)
但我无法测试在我的用户名变量更新后,我的状态是否更新。测试失败,我的视图状态始终处于初始/默认阶段。如果有更好的方法来测试已合并的视图状态,请告诉我:)
因此,我尝试在 onCreate() 中从前台服务 (LifecycleService) 中的流收集数据,但在第一次回调后,它没有提供新数据。
代码是:
override fun onCreate() {
super.onCreate()
lifecycleScope.launchWhenStarted {
repeatOnLifecycle(Lifecycle.State.STARTED) {
observeCoinsPrices()
}
}
}
Run Code Online (Sandbox Code Playgroud) service android foreground-service kotlin-coroutines kotlin-flow
我有一个存储库,它创建一个流程,在其中发出挂起的 Retrofit 方法的结果。这在应用程序中有效,但我想对代码运行测试。
我在测试中使用 kotlinx-coroutines-test v1.6.0 和 MockWebServer v4.9.3。当我尝试运行测试时,我得到:
Timed out waiting for 1000 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
at app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)
at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)
at app//kotlinx.coroutines.test.TestDispatcher.processEvent$kotlinx_coroutines_test(TestDispatcher.kt:23)
at app//kotlinx.coroutines.test.TestCoroutineScheduler.tryRunNextTask(TestCoroutineScheduler.kt:95)
at app//kotlinx.coroutines.test.TestCoroutineScheduler.advanceUntilIdle(TestCoroutineScheduler.kt:110)
at app//kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTestCoroutine(TestBuilders.kt:212)
at app//kotlinx.coroutines.test.TestBuildersKt.runTestCoroutine(Unknown Source)
at app//kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$1$1.invokeSuspend(TestBuilders.kt:167)
at app//kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$1$1.invoke(TestBuilders.kt)
at app//kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$1$1.invoke(TestBuilders.kt)
at app//kotlinx.coroutines.test.TestBuildersJvmKt$createTestResult$1.invokeSuspend(TestBuildersJvm.kt:13)
(Coroutine boundary)
at app.cash.turbine.ChannelBasedFlowTurbine$awaitEvent$2.invokeSuspend(FlowTurbine.kt:247)
at app.cash.turbine.ChannelBasedFlowTurbine$withTimeout$2.invokeSuspend(FlowTurbine.kt:215)
at app.cash.turbine.ChannelBasedFlowTurbine.awaitItem(FlowTurbine.kt:252)
at ogbe.eva.prompt.home.HomeRepositoryTest$currentTask when server responds with error emits failure$1$1.invokeSuspend(HomeRepositoryTest.kt:90)
at app.cash.turbine.FlowTurbineKt$test$2.invokeSuspend(FlowTurbine.kt:86)
at ogbe.eva.prompt.home.HomeRepositoryTest$currentTask when server responds with error emits failure$1.invokeSuspend(HomeRepositoryTest.kt:89)
at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTestCoroutine$2.invokeSuspend(TestBuilders.kt:208)
(Coroutine creation stacktrace)
at app//kotlinx.coroutines.intrinsics.UndispatchedKt.startCoroutineUndispatched(Undispatched.kt:184) …Run Code Online (Sandbox Code Playgroud) 我有一个CoroutineScope和log()函数的实例,如下所示:
private val scope = CoroutineScope(Dispatchers.IO)
fun log(message: String) = scope.launch { // launching a coroutine
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
}
Run Code Online (Sandbox Code Playgroud)
我使用这个测试代码来启动协程:
repeat(5) { item ->
log("Log $item")
}
Run Code Online (Sandbox Code Playgroud)
该log()函数可以从任何位置、任何位置调用Thread,但不能从协程调用。
经过几次测试后,我可以看到不连续的结果,如下所示:
Log 0
Log 2
Log 4
Log 1
Log 3
Run Code Online (Sandbox Code Playgroud)
打印日志的顺序可以不同。如果我理解正确的话,协程的执行并不能保证是顺序的。这意味着协程 foritem 2可以在协程 for 之前启动item 0。
我希望协程针对每个项目按顺序启动,并且“某些阻塞操作”将按顺序执行,以始终实现下一个日志:
Log 0
Log 1
Log 2
Log 3
Log 4
Run Code Online (Sandbox Code Playgroud)
有没有办法让协同程序按顺序启动?或者也许还有其他方法可以实现我想要的?
预先感谢您的任何帮助!
kotlin-flow ×10
kotlin ×9
android ×6
coroutine ×2
bytebuffer ×1
nio ×1
nonblocking ×1
retrofit2 ×1
service ×1
viewmodel ×1