Ada*_*old 1 kotlin kotlin-multiplatform kotlin-coroutines
我有一个类,它使用actor来确保共享可变状态的线程安全。我为此做了一个小包装actor以使其易于使用:
interface Ref<T : Any> {
fun get(): T
fun transform(transformer: (T) -> T): Job
}
Run Code Online (Sandbox Code Playgroud)
这里get用于runBlocking阻塞,直到它获取 的实际值T:
override fun get(): T = runBlocking {
val deferred = CompletableDeferred<T>()
launch {
actor.send(RefOperation.Get(deferred))
}
deferred.await()
}
Run Code Online (Sandbox Code Playgroud)
并在transform没有的情况下做类似的事情runBlocking,只返回一个Job:
override fun transform(transformer: (T) -> T): Job {
val job = Job()
launch {
actor.send(RefOperation.Transform(transformer, job))
}
return job
}
Run Code Online (Sandbox Code Playgroud)
这很好,直到一个transform调用指向另一个调用:
ref.transform {
...
ref.transform {
}
}
Run Code Online (Sandbox Code Playgroud)
在这里,我有 2Job秒,但没有办法将它们组合成一个单一的Job,join()如果我想等待它们完成,我可以调用它。
对此的解决方案是结构化并发,但是我不知道如何创建我的actor了,因为它被定义为CoroutineScope.
如何actor在保留使用结构化并发的能力的同时继续使用?
请注意,我之所以创建,Ref是因为我的项目是多平台的,对于 JVM 以外的目标,我使用替代实现。
actor以与添加项目相同的顺序处理项目,并在单个协程中按顺序执行。这意味着innertransform将在outer完成后进行处理transform,并且您在使用时无法更改它actor(actor我们无法启动更多协程,因为我们将我们的状态限制在单个线程中,否则可能会出现循环处理顺序)。试图transform在外部的主体中加入内部的工作transform(如果我们标记transform为挂起的函数)只会导致死锁。
你能接受这样的行为吗?如果没有,请不要使用 actor 或嵌套变换。如果是,请提供一些用例,其中创建transform将在外层之后处理的嵌套transform有意义。
至于加入所有的工作,我有一些代码。在main我们有创建内部变换的外部变换。外层返回2,内层返回8,但是内层是在外层完成后开始的,所以结果是8。但是如你所愿,transformJob.join()in 也在main等待内部工作。
private sealed class RefOperation<T>
private class Get<T : Any>(val deferred: CompletableDeferred<T>) : RefOperation<T>()
private class Transform<T : Any>(val transformer: TransformStub<T>.(T) -> T, val stub: TransformStub<T>, val job: CompletableJob) : RefOperation<T>()
interface Ref<T : Any> {
fun get(): T
fun transform(transformer: TransformStub<T>.(T) -> T): Job
}
interface TransformStub<T : Any> {
fun transform(transformer: TransformStub<T>.(T) -> T): Job
}
private class TransformStubImpl<T : Any>(
val actor: SendChannel<RefOperation<T>>,
val scope: CoroutineScope
) : TransformStub<T> {
override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
return scope.launch {
val childJob: CompletableJob = Job()
val childStub = TransformStubImpl(actor, this)
actor.send(Transform(transformer, childStub, childJob))
childJob.join()
}
}
}
class RefImpl<T : Any>(initialValue: T) : Ref<T> {
private val actorJob = Job()
private val actorScope = CoroutineScope(actorJob)
private val actor = actorScope.actor<RefOperation<T>> {
var value: T = initialValue
for (msg in channel) {
when (msg) {
is Get -> {
println("Get! $value")
msg.deferred.complete(value)
}
is Transform -> {
with(msg) {
val newValue = stub.transformer(value)
println("Transform! $value -> $newValue")
value = newValue
job.complete()
}
}
}
}
}
override fun get(): T = runBlocking {
val deferred = CompletableDeferred<T>()
actor.send(Get(deferred))
deferred.await()
}
override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
val stub = TransformStubImpl(actor, GlobalScope)
return stub.transform(transformer)
}
}
fun main() = runBlocking<Unit> {
val ref: Ref<Int> = RefImpl(0)
val transformJob = ref.transform {
transform { 8 }
2
}
transformJob.join()
ref.get()
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
153 次 |
| 最近记录: |