Pre*_*rem 11 kotlin kotlin-coroutines kotlin-flow
我试图在我的类中保留可变的状态流,但是当我对其应用任何方法时,它将转换为不可变的Flow<T>
:
class MyClass : Listener<String> {
private val source = Source()
val flow: Flow<String?>
get() = _flow
// region listener
override fun onUpdate(value: String?) {
if (value!= null) {
// emit object changes to the flow
// not possible, because the builder operators on the flow below convert it to a `Flow` and it doesn't stay as a `MutableSharedFlow` :(
_flow.tryEmit(value)
}
}
// end-region
@OptIn(ExperimentalCoroutinesApi::class)
private val _flow by lazy {
MutableStateFlow<String?>(null).onStart {
emitAll(
flow<String?> {
val initialValue = source.getInitialValue()
emit(initialValue)
}.flowOn(MyDispatchers.background)
)
}.onCompletion { error ->
// when the flow is cancelled, stop listening to changes
if (error is CancellationException) {
// is was cancelled
source.removeListener(this@MyClass)
}
}.apply {
// listen to changes and send them to the flow
source.addListener(this@MyClass)
}
}
}
Run Code Online (Sandbox Code Playgroud)
MutableStateFlow
在我应用这些方法之后,有没有办法保持流程不变onCompletion/onStart
?
我想这个extension
功能可以解决你的问题
public fun <T> Flow<T>.mutableStateIn(
scope: CoroutineScope,
initialValue: T
): MutableStateFlow<T> {
val flow = MutableStateFlow(initialValue)
scope.launch {
this@mutableStateIn.collect(flow)
}
return flow
}
Run Code Online (Sandbox Code Playgroud)
如果将转换应用于可变状态流,则生成的流将变为只读,因为原始流充当其源。如果您想手动发出事件,则需要将它们发送到初始源流。
话虽如此,您想要在这里实现的目标似乎非常简单:将基于回调的 API 桥接到 API Flow
。Kotlin 协程中有一个内置函数可以执行此操作,称为callbackFlow。
我不确定你的源 API 如何处理背压,但它看起来像这样:
@OptIn(ExperimentalCoroutinesApi::class)
fun Source.asFlow(): Flow<String?> = callbackFlow {
send(getInitialValue())
val listener = object : Listener<String> {
override fun onUpdate(value: String?) {
if (value != null) {
trySend(value)
}
}
}
addListener(listener)
awaitClose {
removeListener(listener)
}
}
Run Code Online (Sandbox Code Playgroud)
或者可能用runBlocking { send(value) }
代替trySend()
,具体取决于如何Source
处理其自己的线程池中的背压和阻塞。
请注意,flowOn
可能会在此流程之上使用,但它只对 真正重要getInitialValue()
,因为执行回调的线程Source
无论如何都是由 控制的。
如果添加许多侦听器对于 来说成本高昂Source
,您还可以考虑使用shareIn()
运算符共享此流,以便多个订阅者共享相同的侦听器订阅。
归档时间: |
|
查看次数: |
10841 次 |
最近记录: |