在 Kotlin 中锁定互斥体的正确方法

man*_*tta 7 kotlin kotlin-coroutines

我想使用 Kotlin 协程实现一个简单的线程安全 Buffer,因为项目中已经使用了协程。

该缓冲区将在多线程和单线程上下文中使用,因此suspend fun getMostRecentData()似乎不太合理(请参见下面的代码)。

这是我到目前为止所拥有的。事实上,我必须编写所有代码来锁定互斥体,这让我想知道我是否做错了什么。

无论如何,这是代码:

class SafeBuffer(
    private val dispatcher: CoroutineDispatcher,
    private val bufferSize: Int
    ) {

    private val buffer = LinkedList<MyDataType>()
    private val mutex = Mutex()

    val size: Int
        get() = buffer.size

   // First approach: make a suspend fun
   // Not great because I will need a runBlocking{} statement somewhere, every time I want to access the buffer
   suspend fun getMostRecentData() : MyDataType? {
        mutex.withLock {
            return if (buffer.isEmpty()) null else buffer.last
        }
    }

   // Second approach: use a runBlocking block inside the function
   // Seems like it is missing the purpose of coroutines, and I'm not 
   // sure it is actually thread safe if other context is used somehow?
   fun getMostRecentData() : MyDataType? {
        runBlocking(dispatcher) {
            mutex.withLock {
                return if (buffer.isEmpty()) null else buffer.last
            }
        }
    }

    /**** More code ****/
    (...)

}
Run Code Online (Sandbox Code Playgroud)

那么实现这一目标的最惯用/优雅的方法是什么?

Wil*_*eed 5

扩展我的评论,我认为让缓冲区类仅公开 a 是惯用的suspend fun,因为该类的使用者将负责弄清楚他们想要如何使用它(通过runBlocking或从另一个协程)。如果您经常看到此用例,惯用的方法可能是使用扩展函数来SafeBuffer提供此功能。

协程 API 中到处都使用了扩展函数。在您的代码示例中,evenMutex.withLock被定义为扩展函数。

class SafeBuffer(...) {

    private val buffer = LinkedList<MyDataType>()
    private val mutex = Mutex()

    suspend fun getMostRecentData() : MyDataType? =
        mutex.withLock {
            if (buffer.isEmpty()) null else buffer.last
        }
}

fun SafeBuffer.getMostRecentDataBlocking(): MyDataType? =
    runBlocking {
        getMostRecentData()
    }
Run Code Online (Sandbox Code Playgroud)