非阻塞 I/O 和 Kotlin 协程之间有什么关系?

Mar*_*rco 3 nonblocking kotlin ktor kotlin-coroutines ktor-client

Kotlin 协程和非阻塞 I/O 之间有什么关系?其中之一是否暗示着另一个?如果我使用阻塞 I/O 会发生什么?这对性能有何影响?

Mar*_*rco 6

协程被设计为包含非阻塞(即CPU 密集型)代码。这就是为什么默认协程调度程序 \xe2\x80\x93 Dispatchers.Default \xe2\x80\x93 共有线程max(2, num_of_cpus)来执行调度的协程。例如,默认情况下,高度并发的程序(例如在具有 2 个 CPU 的计算机中运行的 Web 服务器)的计算能力会下降 50%,同时线程会阻塞等待 I/O 在协程中完成。

\n

不过,非阻塞 I/O 并不是协程的一个功能。协程只是提供了一种更简单的编程模型,其中包含挂起函数,而不是Java 中难以阅读的CompletableFuture<T>延续,以及其他概念中的结构化并发

\n
\n

要了解协程和非阻塞 I/O 如何协同工作,这里有一个实际示例:

\n

server.js:一个简单的 Node.js HTTP 服务器,用于接收请求,然后返回响应~5s

\n
const { createServer } = require("http");\n\nlet reqCount = 0;\nconst server = createServer(async (req, res) => {\n    const { method, url } = req;\n    const reqNumber = ++reqCount;\n    console.log(`${new Date().toISOString()} [${reqNumber}] ${method} ${url}`);\n    \n    await new Promise((resolve) => setTimeout(resolve, 5000));\n    res.end("Hello!\\n");\n    console.log(`${new Date().toISOString()} [${reqNumber}] done!`);\n});\n\nserver.listen(8080);\nconsole.log("Server started!");\n
Run Code Online (Sandbox Code Playgroud)\n

main.kt:使用三种实现向 Node.js 服务器发送 128 个 HTTP 请求:

\n

1. :在 Dispatchers.IOwithJdkClientBlocking()调度的协程内调用 JDK11 java.net.http.HttpClient的阻塞 I/O 方法。

\n
import java.net.URI\nimport java.net.http.HttpClient as JDK11HttpClient\nimport java.net.http.HttpRequest as JDK11HttpRequest\nimport java.net.http.HttpResponse as JDK11HttpResponse\nimport kotlinx.coroutines.Dispatchers\nimport kotlinx.coroutines.withContext\n\nfun withJdkClientBlocking() {\n    println("Running with JDK11 client using blocking send()")\n\n    val client = JDK11HttpClient.newHttpClient()\n    runExample {\n        // Sometimes you can\'t avoid coroutines with blocking I/O methods.\n        // These must be always be dispatched by Dispatchers.IO.\n        withContext(Dispatchers.IO) {\n            // Kotlin compiler warns this is a blocking I/O method.\n            val response = client.send(\n                JDK11HttpRequest.newBuilder(URI("http://localhost:8080")).build(),\n                JDK11HttpResponse.BodyHandlers.ofString()\n            )\n            // Return status code.\n            response.statusCode()\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

2. withJdkClientNonBlocking():调用JDK11java.net.HttpClient非阻塞I/O方法。这些方法返回一个,其结果使用kotlinx-coroutines-jdk8中的CompletionStage<T>.await()互操作扩展函数CompletableFuture<T>来使用。即使 I/O 不会阻塞任何线程,异步请求/响应编组/解编组也在 Java Executor上运行,因此该示例使用单线程执行器来说明单个线程如何处理由于非-阻塞I/O。

\n
import java.net.URI\nimport java.net.http.HttpClient as JDK11HttpClient\nimport java.net.http.HttpRequest as JDK11HttpRequest\nimport java.net.http.HttpResponse as JDK11HttpResponse\nimport java.util.concurrent.Executors\nimport kotlinx.coroutines.future.await\n\nfun withJdkClientNonBlocking() {\n    println("Running with JDK11 client using non-blocking sendAsync()")\n\n    val httpExecutor = Executors.newSingleThreadExecutor()\n    val client = JDK11HttpClient.newBuilder().executor(httpExecutor).build()\n    try {\n        runExample {\n            // We use `.await()` for interoperability with `CompletableFuture`.\n            val response = client.sendAsync(\n                JDK11HttpRequest.newBuilder(URI("http://localhost:8080")).build(),\n                JDK11HttpResponse.BodyHandlers.ofString()\n            ).await()\n            // Return status code.\n            response.statusCode()\n        }\n    } finally {\n        httpExecutor.shutdown()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

3. withKtorHttpClient()使用Ktor,一个用 Kotlin 和协程编写的非阻塞 I/O HTTP 客户端。

\n
import io.ktor.client.engine.cio.CIO\nimport io.ktor.client.HttpClient as KtorClient\nimport io.ktor.client.request.get\nimport io.ktor.client.statement.HttpResponse as KtorHttpResponse\n\nfun withKtorHttpClient() {\n    println("Running with Ktor client")\n\n    // Non-blocking I/O does not imply unlimited connections to a host.\n    // You are still limited by the number of ephemeral ports (an other limits like file descriptors).\n    // With no configurable thread limit, you can configure the max number of connections.\n    // Note that HTTP/2 allows concurrent requests with a single connection.\n    KtorClient(CIO) { engine { maxConnectionsCount = 128 } }.use { client ->\n        runExample {\n            // KtorClient.get() is a suspend fun, so suspension is implicit here\n            val response = client.get<KtorHttpResponse>("http://localhost:8080")\n            // Return status code.\n            response.status.value\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

把它们放在一起:

\n
import kotlin.system.measureTimeMillis\nimport kotlinx.coroutines.Deferred\nimport kotlinx.coroutines.asCoroutineDispatcher\nimport kotlinx.coroutines.async\nimport kotlinx.coroutines.awaitAll\nimport kotlinx.coroutines.runBlocking\n\nfun runExample(block: suspend () -> Int) {\n    var successCount = 0\n    var failCount = 0\n\n    Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { dispatcher ->\n        measureTimeMillis {\n            runBlocking(dispatcher) {\n                val responses = mutableListOf<Deferred<Int>>()\n                repeat(128) { responses += async { block() } }\n                responses.awaitAll().forEach {\n                    if (it in 200..399) {\n                        ++successCount\n                    } else {\n                        ++failCount\n                    }\n                }\n            }\n        }.also {\n            println("Successfully sent ${success + fail} requests in ${it}ms: $successCount were successful and $failCount failed.")\n        }\n    }\n}\n\nfun main() {\n    withJdkClientBlocking()\n    withJdkClientNonBlocking()\n    withKtorHttpClient()\n}\n
Run Code Online (Sandbox Code Playgroud)\n
\n

运行示例:

\n

main.kt(用于# comments澄清)

\n
# There were ~6,454ms of overhead in this execution\nRunning with JDK11 client using blocking send()\nSuccessfully sent 128 requests in 16454ms: 128 were successful and 0 failed.\n\n# There were ~203ms of overhead in this execution\nRunning with JDK11 client using non-blocking sendAsync()\nSuccessfully sent 128 requests in 5203ms: 128 were successful and 0 failed.\n\n# There were ~862ms of overhead in this execution\nRunning with Ktor client\nSuccessfully sent 128 requests in 5862ms: 128 were successful and 0 failed.\n
Run Code Online (Sandbox Code Playgroud)\n

server.js(用于# comments澄清)

\n
# These are the requests from JDK11\'s HttpClient blocking I/O.\n# Notice how we only receive 64 requests at a time.\n# This is because Dispatchers.IO has a limit of 64 threads by default, so main.kt can\'t send anymore requests until those are done and the Dispatchers.IO threads are released.\n2022-07-24T17:59:29.107Z [1] GET /\n(...)\n2022-07-24T17:59:29.218Z [64] GET /\n2022-07-24T17:59:34.124Z [1] done!\n(...)\n2022-07-24T17:59:34.219Z [64] done!\n2022-07-24T17:59:35.618Z [65] GET /\n(...)\n2022-07-24T17:59:35.653Z [128] GET /\n2022-07-24T17:59:40.624Z [65] done!\n(...)\n2022-07-24T17:59:40.655Z [128] done!\n\n# These are the requests from JDK11\'s HttpClient non-blocking I/O.\n# Notice how we receive all 128 requests at once.\n2022-07-24T17:59:41.163Z [129] GET /\n(...)\n2022-07-24T17:59:41.257Z [256] GET /\n2022-07-24T17:59:46.170Z [129] done!\n(...)\n2022-07-24T17:59:46.276Z [256] done!\n\n# These are there requests from Ktor\'s HTTP client non-blocking I/O.\n# Notice how we also receive all 128 requests at once.\n2022-07-24T17:59:46.869Z [257] GET /\n(...)\n2022-07-24T17:59:46.918Z [384] GET /\n2022-07-24T17:59:51.874Z [257] done!\n(...)\n2022-07-24T17:59:51.921Z [384] done!\n
Run Code Online (Sandbox Code Playgroud)\n