hah*_*aha 8 android socketchannel kotlin okhttp kotlinx.coroutines
我想使用 Kotlin(v1.3.0)协程和 java.nio.channels。SocketChannel (NIO) 来替代connectAndroid 中的Socket (阻塞 IO)。因为这可以节省很多线程。
下面的代码因为job.await()在 Kotlin 中挂起函数而无法运行,它只能在 Ktolin 协程块中调用。喜欢launch{..},async{..}。
// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {
// Start a new connection
// Create a non-blocking socket channel
val socketChannel = SocketChannel.open()
socketChannel.configureBlocking(false)
// async calls NIO connect function
val job = GlobalScope.async(oneThreadCtx) {
aConnect(socketChannel, address)
}
// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()
return socketChannel
}
Run Code Online (Sandbox Code Playgroud)
但是,我尝试runBlocking{..}将此函数用作 Java 中的普通函数。但job.await阻止了当前的 Java 线程,而不是 suspend。
那么,我应该如何使用 Kotlin(v1.3.0) 协程来实现这个功能?
正如 Marko 指出的那样,即使阻塞操作在异步协程中,您的代码最终仍会阻塞线程。要真正通过 Java 和 Kotlin 获得您想要的异步行为,您需要使用Socket Channel的异步版本
这样,您就可以获得真正的异步套接字处理。使用该类和 Kotlin 的suspendCoroutine构建器方法,您可以将异步处理程序转换为可挂起的调用。
这是一个实现它以供阅读的示例:
class TcpSocket(private val socket: AsynchronousSocketChannel) {
suspend fun read(buffer: ByteBuffer): Int {
return socket.asyncRead(buffer)
}
fun close() {
socket.close()
}
private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
return suspendCoroutine { continuation ->
this.read(buffer, continuation, ReadCompletionHandler)
}
}
object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
override fun completed(result: Int, attachment: Continuation<Int>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Int>) {
attachment.resumeWithException(exc)
}
}
}
Run Code Online (Sandbox Code Playgroud)
您可以选择删除我在此处执行的包装,然后asyncRead在 AsynchronousSocketChannel 上公开一个方法,如下所示:
suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
return suspendCoroutine { continuation ->
this.read(buffer, continuation, ReadCompletionHandler)
}
}
object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
override fun completed(result: Int, attachment: Continuation<Int>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Int>) {
attachment.resumeWithException(exc)
}
}
Run Code Online (Sandbox Code Playgroud)
这完全取决于品味以及您的设计目标究竟是什么。您应该能够为初始连接实现类似的方法,就像我在这里为读取所做的那样。
// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()
Run Code Online (Sandbox Code Playgroud)
这不是一个现实的期望。从Java的角度来看,asuspend fun通过返回一个特殊的常量 来暂停其执行COROUTINE_SUSPENDED。您需要 Kotlin 编译器来向您隐藏这一点,并允许您编写可挂起的常规代码。
即使从 Kotlin 的角度来看,您的代码也达不到非阻塞暂停的要求,因为它使用阻塞调用来连接。将该调用提交给另一个线程并不会使它成为非阻塞。
您的代码所做的完全等同于将作业提交给 Java 执行器服务,然后等待其结果。例如,您可以使用CompletableFuture.supplyAsync.
| 归档时间: |
|
| 查看次数: |
7054 次 |
| 最近记录: |