如何在 Java 代码中使用 Kotlin 协程实现 NIO Socket(客户端)?

hah*_*aha 8 android socketchannel kotlin okhttp kotlinx.coroutines

我想使用 Kotlin(v1.3.0)协程和 java.nio.channels。SocketChannel (NIO) 来替代connectAndroid 中的Socket (阻塞 IO)。因为这可以节省很多线程。

下面的代码因为job.await()在 Kotlin 中挂起函数而无法运行,它只能在 Ktolin 协程块中调用。喜欢launch{..}async{..}

// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {

    // Start a new connection
    // Create a non-blocking socket channel
    val socketChannel = SocketChannel.open()
    socketChannel.configureBlocking(false)

    // async calls NIO connect function
    val job = GlobalScope.async(oneThreadCtx) {
        aConnect(socketChannel, address)
    }

    // I what to suspend(NOT block) current Java Thread, until connect is success
    job.await()

    return socketChannel
}
Run Code Online (Sandbox Code Playgroud)

但是,我尝试runBlocking{..}将此函数用作 Java 中的普通函数。但job.await阻止了当前的 Java 线程,而不是 suspend

那么,我应该如何使用 Kotlin(v1.3.0) 协程来实现这个功能?

Pat*_*mer 7

正如 Marko 指出的那样,即使阻塞操作在异步协程中,您的代码最终仍会阻塞线程。要真正通过 Java 和 Kotlin 获得您想要的异步行为,您需要使用Socket Channel的异步版本

这样,您就可以获得真正的异步套接字处理。使用该类和 Kotlin 的suspendCoroutine构建器方法,您可以将异步处理程序转换为可挂起的调用。

这是一个实现它以供阅读的示例:

class TcpSocket(private val socket: AsynchronousSocketChannel) {
    suspend fun read(buffer: ByteBuffer): Int {
        return socket.asyncRead(buffer)
    }

    fun close() {
        socket.close()
    }

    private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
        return suspendCoroutine { continuation ->
           this.read(buffer, continuation, ReadCompletionHandler)
        }
    }

    object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) {
            attachment.resume(result)
        }

        override fun failed(exc: Throwable, attachment: Continuation<Int>) {
            attachment.resumeWithException(exc)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您可以选择删除我在此处执行的包装,然后asyncRead在 AsynchronousSocketChannel 上公开一个方法,如下所示:

suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
    return suspendCoroutine { continuation ->
       this.read(buffer, continuation, ReadCompletionHandler)
    }
}

object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
    override fun completed(result: Int, attachment: Continuation<Int>) {
        attachment.resume(result)
    }

    override fun failed(exc: Throwable, attachment: Continuation<Int>) {
        attachment.resumeWithException(exc)
    }
}
Run Code Online (Sandbox Code Playgroud)

这完全取决于品味以及您的设计目标究竟是什么。您应该能够为初始连接实现类似的方法,就像我在这里为读取所做的那样。

  • 对于任何情况,[这里是 Kotlin 存储库中的问题链接](https://github.com/Kotlin/kotlinx.coroutines/issues/601),其中包含 Nio.kt 文件的链接,其中此方法是为其他异步方法实现的。 (2认同)

Mar*_*nik 1

// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()
Run Code Online (Sandbox Code Playgroud)

这不是一个现实的期望。从Java的角度来看,asuspend fun通过返回一个特殊的常量 来暂停其执行COROUTINE_SUSPENDED。您需要 Kotlin 编译器来向您隐藏这一点,并允许您编写可挂起的常规代码。

即使从 Kotlin 的角度来看,您的代码也达不到非阻塞暂停的要求,因为它使用阻塞调用来连接。将该调用提交给另一个线程并不会使它成为非阻塞。

您的代码所做的完全等同于将作业提交给 Java 执行器服务,然后等待其结果。例如,您可以使用CompletableFuture.supplyAsync.

  • 解决办法是什么? (2认同)