Urb*_*Urb 19 java java-http-client project-loom virtual-threads java-21
我正在使用 Java Corretto 21.0.0.35.1 build 21+35-LTS和内置 Java HTTP 客户端来检索InputStream. 我正在使用虚拟线程发出并行请求,并且在大多数情况下,它运行良好。然而,有时,我的测试会遇到“固定”事件,如下面的堆栈跟踪所示。
我相信 JDK 已经更新为完全支持虚拟线程,并且根据我的理解,HTTP 客户端根本不应该固定承载线程。但是,似乎在读取并(自动)关闭InputStream.
这是预期的行为吗?或者它仍然是 JDK 中的一个错误吗?
代码:
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream responseBody = response.body()) {
return parser.parse(responseBody); // LINE 52 in the trace below
}
Run Code Online (Sandbox Code Playgroud)
踪迹
* Pinning event captured:
java.lang.VirtualThread.parkOnCarrierThread(java.lang.VirtualThread.java:687)
java.lang.VirtualThread.park(java.lang.VirtualThread.java:603)
java.lang.System$2.parkVirtualThread(java.lang.System$2.java:2639)
jdk.internal.misc.VirtualThreads.park(jdk.internal.misc.VirtualThreads.java:54)
java.util.concurrent.locks.LockSupport.park(java.util.concurrent.locks.LockSupport.java:219)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:754)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:990)
java.util.concurrent.locks.ReentrantLock$Sync.lock(java.util.concurrent.locks.ReentrantLock$Sync.java:153)
java.util.concurrent.locks.ReentrantLock.lock(java.util.concurrent.locks.ReentrantLock.java:322)
sun.nio.ch.SocketChannelImpl.implCloseNonBlockingMode(sun.nio.ch.SocketChannelImpl.java:1091)
sun.nio.ch.SocketChannelImpl.implCloseSelectableChannel(sun.nio.ch.SocketChannelImpl.java:1124)
java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.nio.channels.spi.AbstractSelectableChannel.java:258)
java.nio.channels.spi.AbstractInterruptibleChannel.close(java.nio.channels.spi.AbstractInterruptibleChannel.java:113)
jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:427)
jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:406)
jdk.internal.net.http.Http1Response.lambda$readBody$1(jdk.internal.net.http.Http1Response.java:355)
jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.accept(jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.java:-1)
jdk.internal.net.http.ResponseContent$ChunkedBodyParser.onError(jdk.internal.net.http.ResponseContent$ChunkedBodyParser.java:185)
jdk.internal.net.http.Http1Response$BodyReader.onReadError(jdk.internal.net.http.Http1Response$BodyReader.java:677)
jdk.internal.net.http.Http1AsyncReceiver.checkForErrors(jdk.internal.net.http.Http1AsyncReceiver.java:302)
jdk.internal.net.http.Http1AsyncReceiver.flush(jdk.internal.net.http.Http1AsyncReceiver.java:268)
jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.java:-1)
jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.java:182)
jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.java:149)
jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.java:207)
jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.execute(jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.java:177)
jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:282)
jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:251)
jdk.internal.net.http.Http1AsyncReceiver.onReadError(jdk.internal.net.http.Http1AsyncReceiver.java:516)
jdk.internal.net.http.Http1AsyncReceiver.lambda$handlePendingDelegate$3(jdk.internal.net.http.Http1AsyncReceiver.java:380)
jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.java:-1)
jdk.internal.net.http.Http1AsyncReceiver$Http1AsyncDelegateSubscription.cancel(jdk.internal.net.http.Http1AsyncReceiver$Http1AsyncDelegateSubscription.java:163)
jdk.internal.net.http.common.HttpBodySubscriberWrapper$SubscriptionWrapper.cancel(jdk.internal.net.http.common.HttpBodySubscriberWrapper$SubscriptionWrapper.java:92)
jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.close(jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.java:653)
com.acme.service.server.StatusClient.getResponse(com.acme.service.server.StatusClient.java:52)
com.acme.service.server.StatusClient_ClientProxy.getResponse(com.acme.service.server.StatusClient_ClientProxy.java:-1)
com.acme.client.Request.execute(com.acme.client.Request.java:96)
com.acme.service.server.serviceStatusProvider.getStatusHistorys(com.acme.service.server.serviceStatusProvider.java:237)
com.acme.service.api.RemoteStatusCheck.getStatusHistory(com.acme.service.api.RemoteStatusCheck.java:163)
com.acme.service.api.RemoteStatusCheck.lambda$doChecks$0(com.acme.service.api.RemoteStatusCheck.java:132)
com.acme.service.api.RemoteStatusCheck$$Lambda+0x00007f4cb9f0d8d0.979953307.call(com.acme.service.api.RemoteStatusCheck$$Lambda+0x00007f4cb9f0d8d0.979953307.java:-1)
java.util.concurrent.FutureTask.run(java.util.concurrent.FutureTask.java:317)
java.lang.VirtualThread.runWith(java.lang.VirtualThread.java:341)
java.lang.VirtualThread.run(java.lang.VirtualThread.java:311)
java.lang.VirtualThread$VThreadContinuation$1.run(java.lang.VirtualThread$VThreadContinuation$1.java:192)
jdk.internal.vm.Continuation.enter0(jdk.internal.vm.Continuation.java:320)
jdk.internal.vm.Continuation.enter(jdk.internal.vm.Continuation.java:312)
jdk.internal.vm.Continuation.enterSpecial(jdk.internal.vm.Continuation.java:-1)
Run Code Online (Sandbox Code Playgroud)
Mar*_*eel 23
该方法java.nio.channels.spi.AbstractInterruptibleChannel.close()(Temurin-21+35(构建 21+35-LTS)中的第 108 - 115 行,但可能是所有 OpenJDK 衍生物)实现为:
public final void close() throws IOException {
synchronized (closeLock) {
if (closed)
return;
closed = true;
implCloseChannel();
}
}
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪中的第 113 行对应于该implCloseChannel()调用,该调用也对应于堆栈跟踪中的上一行,并且位于该同步块的中间。如果虚拟线程在块中停放/阻塞synchronized,则它们将被固定,这就是它被固定的原因。
换句话说,鉴于代码本身,固定是预期且正确的行为,因此不是错误。
我不知道这里的使用是否synchronized是在删除 JDK 中的同步块时的疏忽,或者是否有特定原因仍然使用。synchronized鉴于它是一个私有锁对象,我想应该可以通过将其替换为 a 或类似对象来摆脱它(即它不是通道“API”的一部分)ReentrantLock,但也许还有其他实现原因暂时保留这个。
我在线程的 nio-dev 列表中询问过这个问题AbstractInterruptibleChannel.close() 是否仍然使用同步块?
艾伦·贝特曼对此作出回应:
我们认为这不值得做,因为很少设置 SO_LINGER。关闭时由于 readLock 或 writeLock 争用而导致的临时固定是可以的。
与此同时,我们正在努力消除对同步块的限制。我们希望很快在 Loom 仓库中能有一些东西。