如何在保持 Kotlin 结构化并发能力的同时使用 actor?

Ada*_*old 1 kotlin kotlin-multiplatform kotlin-coroutines

我有一个类,它使用actor来确保共享可变状态的线程安全。我为此做了一个小包装actor以使其易于使用:

interface Ref<T : Any> {

    fun get(): T

    fun transform(transformer: (T) -> T): Job

}
Run Code Online (Sandbox Code Playgroud)

这里get用于runBlocking阻塞,直到它获取 的实际值T

override fun get(): T = runBlocking {
    val deferred = CompletableDeferred<T>()
    launch {
        actor.send(RefOperation.Get(deferred))
    }
    deferred.await()
}
Run Code Online (Sandbox Code Playgroud)

并在transform没有的情况下做类似的事情runBlocking,只返回一个Job

override fun transform(transformer: (T) -> T): Job {
    val job = Job()
    launch {
        actor.send(RefOperation.Transform(transformer, job))
    }
    return job
}
Run Code Online (Sandbox Code Playgroud)

这很好,直到一个transform调用指向另一个调用:

ref.transform {

  ...
  ref.transform {

  }
}
Run Code Online (Sandbox Code Playgroud)

在这里,我有 2Job秒,但没有办法将它们组合成一个单一的Jobjoin()如果我想等待它们完成,我可以调用它。

对此的解决方案是结构化并发,但是我不知道如何创建我的actor了,因为它被定义为CoroutineScope.

如何actor在保留使用结构化并发的能力的同时继续使用?

请注意,我之所以创建,Ref是因为我的项目是多平台的,对于 JVM 以外的目标,我使用替代实现。

ard*_*nit 5

actor以与添加项目相同的顺序处理项目,并在单个协程中按顺序执行。这意味着innertransform将在outer完成后进行处理transform,并且您在使用时无法更改它actoractor我们无法启动更多协程,因为我们将我们的状态限制在单个线程中,否则可能会出现循环处理顺序)。试图transform在外部的主体中加入内部的工作transform(如果我们标记transform为挂起的函数)只会导致死锁。

你能接受这样的行为吗?如果没有,请不要使用 actor 或嵌套变换。如果是,请提供一些用例,其中创建transform将在外层之后处理的嵌套transform有意义。

至于加入所有的工作,我有一些代码。在main我们有创建内部变换的外部变换。外层返回2,内层返回8,但是内层是在外层完成后开始的,所以结果是8。但是如你所愿,transformJob.join()in 也在main等待内部工作。

private sealed class RefOperation<T>
private class Get<T : Any>(val deferred: CompletableDeferred<T>) : RefOperation<T>()
private class Transform<T : Any>(val transformer: TransformStub<T>.(T) -> T, val stub: TransformStub<T>, val job: CompletableJob) : RefOperation<T>()

interface Ref<T : Any> {

    fun get(): T

    fun transform(transformer: TransformStub<T>.(T) -> T): Job

}

interface TransformStub<T : Any> {
    fun transform(transformer: TransformStub<T>.(T) -> T): Job
}

private class TransformStubImpl<T : Any>(
        val actor: SendChannel<RefOperation<T>>,
        val scope: CoroutineScope
) : TransformStub<T> {

    override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
        return scope.launch {
            val childJob: CompletableJob = Job()
            val childStub = TransformStubImpl(actor, this)
            actor.send(Transform(transformer, childStub, childJob))
            childJob.join()
        }
    }

}

class RefImpl<T : Any>(initialValue: T) : Ref<T> {

    private val actorJob = Job()
    private val actorScope = CoroutineScope(actorJob)
    private val actor = actorScope.actor<RefOperation<T>> {
        var value: T = initialValue
        for (msg in channel) {
            when (msg) {
                is Get -> {
                    println("Get! $value")
                    msg.deferred.complete(value)
                }
                is Transform -> {
                    with(msg) {
                        val newValue = stub.transformer(value)
                        println("Transform! $value -> $newValue")
                        value = newValue
                        job.complete()
                    }
                }
            }
        }
    }

    override fun get(): T = runBlocking {
        val deferred = CompletableDeferred<T>()
        actor.send(Get(deferred))
        deferred.await()
    }

    override fun transform(transformer: TransformStub<T>.(T) -> T): Job {
        val stub = TransformStubImpl(actor, GlobalScope)
        return stub.transform(transformer)
    }

}

fun main() = runBlocking<Unit> {
    val ref: Ref<Int> = RefImpl(0)
    val transformJob = ref.transform {
        transform { 8 }
        2
    }
    transformJob.join()
    ref.get()
}
Run Code Online (Sandbox Code Playgroud)