Kotlin:具有非阻塞 I/O 的阻塞协程

m0s*_*it0 1 multithreading coroutine kotlin

我正在尝试使用 Kotlin 协程来处理非阻塞 I/O。场景如下:

  1. 从线程 1 上运行的异步回调接收数据。
  2. 在线程 2 中等待该数据然后使用它。

我当前的代码如下所示(为了简洁起见,进行了简化):

private var latch = CountDownLatch(1)
private var data: Any? = null

// Async callback from non-blocking I/O
fun onReceive(data: Any) {
    currentData = data
    latch.countDown()
}

// Wait and consume data
fun getData(): Any? {
    latch.await()
    latch = CountDownLatch(1)
    return currentData
}

fun processData() {
    launch(CommonPool) {
        while (true) {
            val data = getData()
            // Consume data                
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

据我了解,Kotlin 协程应该能够帮助我摆脱 CountDownLatch。读完这个(很棒的)指南后,我能想到的就是这样的:

// Wait and consume data
fun getData() = async(CommonPool) {
    latch.await()
    latch = CountDownLatch(1)
    currentData
}

fun processData() {
    launch(CommonPool) {
        while (true) {
            runBlocking {
                val data = getData().await()
                // Consume data                
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我也尝试过Pipelines,得到了类似的结果。我显然不明白如何使用这些功能。

jai*_*ior 6

在使用 Android 进行开发时,有一个非常常见的模式CountDownLatch,有时您希望在处理时实现异步实现BroadcastReceiversCountDownLatch这非常方便。

private suspend fun yourSuspendMethod() {
    val job = GlobalScope.async {    
        val latch = CountDownLatch(1)

        val watcher = object : BroadcastReceiver() {

            override fun onReceive(context: Context?, intent: Intent?) {
                if // your logic
                    latch.countDown()
            }
        }

        try {
            mContext?.registerReceiver(watcher, IntentFilter(...))

            //call a method that will trigger the broadcast receiver

            if (!latch.await(5, TimeUnit.SECONDS)) {
                throw Exception("Failed .... on latch's timeout")
            }
        } finally {
            mContext?.unregisterReceiver(watcher)
        }
    }

    job.await()
}
Run Code Online (Sandbox Code Playgroud)

这里有一件非常重要的事情,不要更改 CoroutineScope 的上下文,否则它们将在完全不同的地方运行,通过执行我上面留下的方式,它会成为范围/上下文的子级。

[编辑] 我决定对这个问题投入更多的思考以避免使用CountDownLatch. 锁存器的问题在于,当您调用latch.await它时,它将停止当前线程,因此如果这是来自主线程,则主线程将等待并且超时,因为它没有给接收者调用的机会。解决这个问题的一种方法是使用我上面使用的示例。

在上面的示例中我忘记的一件事是,如果您想进行单元测试并同步调用者的上下文,您将需要注入上下文。如果您决定这样做,您的实现将变得更加复杂,并且您将无法使用协程的全部功能,因为您将创建额外的线程。

withTimeout因此,解决方案是使用+的组合suspendCancellableCoroutine,您可以使用此扩展:

suspend inline fun <T> suspendCoroutineWithTimeout(
    timeout: Long,
    crossinline block: (Continuation<T>) -> Unit
) = withTimeout(timeout) {
    suspendCancellableCoroutine(block = block)
}
Run Code Online (Sandbox Code Playgroud)

你的方法将如下所示:

private suspend fun yourSuspendMethod() {
    var watcher: BroadcastReceiver? = null

    try {
        suspendCoroutineWithTimeout<Boolean>(TimeUnit.SECONDS.toMillis(5)) {
            watcher = object : BroadcastReceiver() {

                override fun onReceive(context: Context?, intent: Intent?) {
                    if // your logic
                        it.resume(true)
                }
            }

            context?.registerReceiver(watcher, IntentFilter(...))
            //call a method that will trigger the broadcast receiver
        }
    } finally {
        context?.unregisterReceiver(watcher)
    }
}
Run Code Online (Sandbox Code Playgroud)

就这样了。现在,协程将在不停止调用者线程的情况下发挥其魔力,并且当作业被取消时,超时也将取消。