Java 21内置HTTP客户端固定载体线程

Urb*_*Urb 19 java java-http-client project-loom virtual-threads java-21

我正在使用 Java Corretto 21.0.0.35.1 build 21+35-LTS和内置 Java HTTP 客户端来检索InputStream. 我正在使用虚拟线程发出并行请求,并且在大多数情况下,它运行良好。然而,有时,我的测试会遇到“固定”事件,如下面的堆栈跟踪所示。

我相信 JDK 已经更新为完全支持虚拟线程,并且根据我的理解,HTTP 客户端根本不应该固定承载线程。但是,似乎在读取并(自动)关闭InputStream.

这是预期的行为吗?或者它仍然是 JDK 中的一个错误吗?

代码:

HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream responseBody = response.body()) {
  return parser.parse(responseBody); // LINE 52 in the trace below
}
Run Code Online (Sandbox Code Playgroud)

踪迹

* Pinning event captured:
  java.lang.VirtualThread.parkOnCarrierThread(java.lang.VirtualThread.java:687)
  java.lang.VirtualThread.park(java.lang.VirtualThread.java:603)
  java.lang.System$2.parkVirtualThread(java.lang.System$2.java:2639)
  jdk.internal.misc.VirtualThreads.park(jdk.internal.misc.VirtualThreads.java:54)
  java.util.concurrent.locks.LockSupport.park(java.util.concurrent.locks.LockSupport.java:219)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:754)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:990)
  java.util.concurrent.locks.ReentrantLock$Sync.lock(java.util.concurrent.locks.ReentrantLock$Sync.java:153)
  java.util.concurrent.locks.ReentrantLock.lock(java.util.concurrent.locks.ReentrantLock.java:322)
  sun.nio.ch.SocketChannelImpl.implCloseNonBlockingMode(sun.nio.ch.SocketChannelImpl.java:1091)
  sun.nio.ch.SocketChannelImpl.implCloseSelectableChannel(sun.nio.ch.SocketChannelImpl.java:1124)
  java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.nio.channels.spi.AbstractSelectableChannel.java:258)
  java.nio.channels.spi.AbstractInterruptibleChannel.close(java.nio.channels.spi.AbstractInterruptibleChannel.java:113)
  jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:427)
  jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:406)
  jdk.internal.net.http.Http1Response.lambda$readBody$1(jdk.internal.net.http.Http1Response.java:355)
  jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.accept(jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.java:-1)
  jdk.internal.net.http.ResponseContent$ChunkedBodyParser.onError(jdk.internal.net.http.ResponseContent$ChunkedBodyParser.java:185)
  jdk.internal.net.http.Http1Response$BodyReader.onReadError(jdk.internal.net.http.Http1Response$BodyReader.java:677)
  jdk.internal.net.http.Http1AsyncReceiver.checkForErrors(jdk.internal.net.http.Http1AsyncReceiver.java:302)
  jdk.internal.net.http.Http1AsyncReceiver.flush(jdk.internal.net.http.Http1AsyncReceiver.java:268)
  jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.java:-1)
  jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.java:182)
  jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.java:149)
  jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.java:207)
  jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.execute(jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.java:177)
  jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:282)
  jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:251)
  jdk.internal.net.http.Http1AsyncReceiver.onReadError(jdk.internal.net.http.Http1AsyncReceiver.java:516)
  jdk.internal.net.http.Http1AsyncReceiver.lambda$handlePendingDelegate$3(jdk.internal.net.http.Http1AsyncReceiver.java:380)
  jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.java:-1)
  jdk.internal.net.http.Http1AsyncReceiver$Http1AsyncDelegateSubscription.cancel(jdk.internal.net.http.Http1AsyncReceiver$Http1AsyncDelegateSubscription.java:163)
  jdk.internal.net.http.common.HttpBodySubscriberWrapper$SubscriptionWrapper.cancel(jdk.internal.net.http.common.HttpBodySubscriberWrapper$SubscriptionWrapper.java:92)
  jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.close(jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.java:653)

  com.acme.service.server.StatusClient.getResponse(com.acme.service.server.StatusClient.java:52)
  com.acme.service.server.StatusClient_ClientProxy.getResponse(com.acme.service.server.StatusClient_ClientProxy.java:-1)
  com.acme.client.Request.execute(com.acme.client.Request.java:96)
  com.acme.service.server.serviceStatusProvider.getStatusHistorys(com.acme.service.server.serviceStatusProvider.java:237)
  com.acme.service.api.RemoteStatusCheck.getStatusHistory(com.acme.service.api.RemoteStatusCheck.java:163)
  com.acme.service.api.RemoteStatusCheck.lambda$doChecks$0(com.acme.service.api.RemoteStatusCheck.java:132)
  com.acme.service.api.RemoteStatusCheck$$Lambda+0x00007f4cb9f0d8d0.979953307.call(com.acme.service.api.RemoteStatusCheck$$Lambda+0x00007f4cb9f0d8d0.979953307.java:-1)
  java.util.concurrent.FutureTask.run(java.util.concurrent.FutureTask.java:317)
  java.lang.VirtualThread.runWith(java.lang.VirtualThread.java:341)
  java.lang.VirtualThread.run(java.lang.VirtualThread.java:311)
  java.lang.VirtualThread$VThreadContinuation$1.run(java.lang.VirtualThread$VThreadContinuation$1.java:192)
  jdk.internal.vm.Continuation.enter0(jdk.internal.vm.Continuation.java:320)
  jdk.internal.vm.Continuation.enter(jdk.internal.vm.Continuation.java:312)
  jdk.internal.vm.Continuation.enterSpecial(jdk.internal.vm.Continuation.java:-1)
Run Code Online (Sandbox Code Playgroud)

Mar*_*eel 23

该方法java.nio.channels.spi.AbstractInterruptibleChannel.close()(Temurin-21+35(构建 21+35-LTS)中的第 108 - 115 行,但可能是所有 OpenJDK 衍生物)实现为:

public final void close() throws IOException {
    synchronized (closeLock) {
        if (closed)
            return;
        closed = true;
        implCloseChannel();
    }
}
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪中的第 113 行对应于该implCloseChannel()调用,该调用也对应于堆栈跟踪中的上一行,并且位于该同步块的中间。如果虚拟线程在块中停放/阻塞synchronized,则它们将被固定,这就是它被固定的原因。

换句话说,鉴于代码本身,固定是预期且正确的行为,因此不是错误。

我不知道这里的使用是否synchronized是在删除 JDK 中的同步块时的疏忽,或者是否有特定原因仍然使用。synchronized鉴于它是一个私有锁对象,我想应该可以通过将其替换为 a 或类似对象来摆脱它(即它不是通道“API”的一部分)ReentrantLock,但也许还有其他实现原因暂时保留这个。

我在线程的 nio-dev 列表中询问过这个问题AbstractInterruptibleChannel.close() 是否仍然使用同步块?

艾伦·贝特曼对此作出回应

我们认为这不值得做,因为很少设置 SO_LINGER。关闭时由于 readLock 或 writeLock 争用而导致的临时固定是可以的。

与此同时,我们正在努力消除对同步块的限制。我们希望很快在 Loom 仓库中能有一些东西。

  • @Urb只是一个性能问题。而且,与所有潜在的性能问题一样,只有当您遇到性能不佳时,才会出现性能问题。因此,如果您的应用程序出现实际性能问题,请打开固定事件(和其他潜在问题)的日志记录,但不要更早。 (4认同)
  • @Holger 是的。这个想法是通过集成测试捕获固定事件。事实上,当使用 Apache http 客户端时,测试显示仅在一个请求中就有数百个固定事件。因此,我们将 Apache 客户端替换为内置客户端,现在只是偶尔会出现固定事件。在这种情况下,这可能不是问题,但代码可能会发生变化,我们希望尽早意识到固定并确定这是否是真正的问题。 (3认同)