在 Kotlin Native 中,如何将对象保留在单独的线程中,并在不使用 C 指针的情况下从任何其他线程改变其状态?

tre*_*ere 4 multithreading kotlin kotlin-native kotlin-multiplatform

我正在探索 Kotlin Native,并且有一个程序,其中有一群Workers执行并发操作(在 Windows 上运行,但这是一个普遍问题)。

现在,我想添加简单的日志记录。一个组件,它通过将字符串作为新行附加到以“附加”模式保持打开的文件中来简单地记录字符串。

(理想情况下,我只有一个“全局”功能......

fun log(text:String) {...} ] 
Run Code Online (Sandbox Code Playgroud)

...我可以从任何地方打电话,包括从其他工作人员“内部”打电话,这会起作用。这里的含义是,由于 Kotlin Native 关于在线程之间传递对象的规则,执行此操作并不简单(TLDR:您不应该传递可变对象。请参阅: https: //github.com/JetBrains/kotlin-native/blob /master/CONCURRENCY.md#对象传输和冻结)。另外,我的日志函数理想地接受任何冻结的对象。)


我想出的是使用DetachedObjectGraph 的解决方案:

首先,我创建一个分离的记录器对象

val loggerGraph = DetachedObjectGraph { FileLogger("/foo/mylogfile.txt")}
Run Code Online (Sandbox Code Playgroud)

然后使用loggerGraph.asCPointer()asCPointer())获取COpaquePointer分离图:

val myPointer = loggerGraph.asCPointer()
Run Code Online (Sandbox Code Playgroud)

现在我可以将此指针传递给工作人员(通过工作人员的执行函数的生产者 lambda),并在那里使用它。或者我可以将指针存储在@ThreadLocal全局变量中。


对于写入文件的代码,每当我想要记录一行时,我都必须DetachedObjectGraph再次从指针创建一个对象,attach()以便获取对我的 fileLogger 对象的引用:

val fileLogger = DetachedObjectGraph(myPointer).attach()
Run Code Online (Sandbox Code Playgroud)

现在我可以在记录器上调用日志函数:

fileLogger.log("My log message")
Run Code Online (Sandbox Code Playgroud)

这就是我在查看 Kotlin Native 中可用于并发的 API(从 Kotlin 1.3.61 开始)时想到的,但我想知道更好的方法是什么(使用 Kotlin,而不是诉诸 C )。显然,为每行写入创建一个 DetachedObjectGraph 对象是不好的。


人们可以用一种更通用的方式提出这个问题:如何在单独的线程(或工作线程)中保持可变资源打开,并向其发送消息。

旁注:拥有真正使用线程的协程可以解决这个问题,但问题是如何使用当前可用的 API(Kotlin 1.3.61)来解决这个任务。

Kev*_*gan 5

您绝对不应该DetachedObjectGraph以问题中提出的方式使用。没有什么可以阻止您尝试附加到多个线程,或者如果您传递相同的指针,则尝试附加到附加到它的另一个线程后的无效线程。

正如多米尼克提到的,您可以将其保留DetachedObjectGraphAtomicReference. 但是,如果您要保留DetachedObjectGraphan AtomicReference,请确保类型为AtomicRef<DetachedObjectGraph?>忙循环,而 为DetachedObjectGraphnull。这将防止DetachedObjectGraph多个线程使用相同的内容。确保将其设置为 null,并以原子方式重新填充它。

然而,是否FileLogger需要可变呢?如果您正在写入文件,则似乎并非如此。即使是这样,我也会将可变对象隔离给单独的工作人员并向其发送日志消息,而不是在DetachedObjectGraphAtomicRef 内部执行操作。

根据我的经验,DetachedObjectGraph这在生产代码中非常罕见。目前我们没有在任何地方使用它。

要将可变状态隔离到 a Worker,如下所示:


class MutableThing<T:Any>(private val worker:Worker = Worker.start(), producer:()->T){
    private val arStable = AtomicReference<StableRef<T>?>(null)
    init {
        worker.execute(TransferMode.SAFE, {Pair(arStable, producer).freeze()}){
            it.first.value = StableRef.create(it.second()).freeze()
        }
    }
    fun <R> access(block:(T)->R):R{
        return worker.execute(TransferMode.SAFE, {Pair(arStable, block).freeze()}){
            it.second(it.first.value!!.get())
        }.result
    }
}

object Log{
    private val fileLogger = MutableThing { FileLogger() }

    fun log(s:String){
        fileLogger.access { fl -> fl.log(s) }
    }
}

class FileLogger{
    fun log(s:String){}
}
Run Code Online (Sandbox Code Playgroud)

MutableThing内部使用StableRefproducer使您想要隔离的可变状态。要记录某些内容,请调用Log.log,这将最终调用可变的FileLogger

要查看 的基本示例MutableThing,请运行以下测试:

@Test
fun goIso(){
    val mt = MutableThing { mutableListOf("a", "b")}
    val workers = Array(4){Worker.start()}
    val futures = mutableListOf<Future<*>>()
    repeat(1000) { rcount ->
        val future = workers[rcount % workers.size].execute(
            TransferMode.SAFE,
            { Pair(mt, rcount).freeze() }
        ) { pair ->
            pair.first.access {
                val element = "ttt ${pair.second}"
                println(element)
                it.add(element)
            }
        }
        futures.add(future)
    }

    futures.forEach { it.result }

    workers.forEach { it.requestTermination() }

    mt.access {
        println("size: ${it.size}")
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 实现了更正式的版本,并附有相关的博客文章:https://dev.to/touchlab/kotlin-native-transferring-state-4n8i (2认同)