小编Urb*_*Urb的帖子

Java 21内置HTTP客户端固定载体线程

我正在使用 Java Corretto 21.0.0.35.1 build 21+35-LTS和内置 Java HTTP 客户端来检索InputStream. 我正在使用虚拟线程发出并行请求,并且在大多数情况下,它运行良好。然而,有时,我的测试会遇到“固定”事件,如下面的堆栈跟踪所示。

我相信 JDK 已经更新为完全支持虚拟线程,并且根据我的理解,HTTP 客户端根本不应该固定承载线程。但是,似乎在读取并(自动)关闭InputStream.

这是预期的行为吗?或者它仍然是 JDK 中的一个错误吗?

代码:

HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream responseBody = response.body()) {
  return parser.parse(responseBody); // LINE 52 in the trace below
}
Run Code Online (Sandbox Code Playgroud)

踪迹

* Pinning event captured:
  java.lang.VirtualThread.parkOnCarrierThread(java.lang.VirtualThread.java:687)
  java.lang.VirtualThread.park(java.lang.VirtualThread.java:603)
  java.lang.System$2.parkVirtualThread(java.lang.System$2.java:2639)
  jdk.internal.misc.VirtualThreads.park(jdk.internal.misc.VirtualThreads.java:54)
  java.util.concurrent.locks.LockSupport.park(java.util.concurrent.locks.LockSupport.java:219)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:754)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:990)
  java.util.concurrent.locks.ReentrantLock$Sync.lock(java.util.concurrent.locks.ReentrantLock$Sync.java:153)
  java.util.concurrent.locks.ReentrantLock.lock(java.util.concurrent.locks.ReentrantLock.java:322)
  sun.nio.ch.SocketChannelImpl.implCloseNonBlockingMode(sun.nio.ch.SocketChannelImpl.java:1091)
  sun.nio.ch.SocketChannelImpl.implCloseSelectableChannel(sun.nio.ch.SocketChannelImpl.java:1124)
  java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.nio.channels.spi.AbstractSelectableChannel.java:258)
  java.nio.channels.spi.AbstractInterruptibleChannel.close(java.nio.channels.spi.AbstractInterruptibleChannel.java:113)
  jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:427)
  jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:406)
  jdk.internal.net.http.Http1Response.lambda$readBody$1(jdk.internal.net.http.Http1Response.java:355)
  jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.accept(jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.java:-1)
  jdk.internal.net.http.ResponseContent$ChunkedBodyParser.onError(jdk.internal.net.http.ResponseContent$ChunkedBodyParser.java:185)
  jdk.internal.net.http.Http1Response$BodyReader.onReadError(jdk.internal.net.http.Http1Response$BodyReader.java:677)
  jdk.internal.net.http.Http1AsyncReceiver.checkForErrors(jdk.internal.net.http.Http1AsyncReceiver.java:302)
  jdk.internal.net.http.Http1AsyncReceiver.flush(jdk.internal.net.http.Http1AsyncReceiver.java:268)
  jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.java:-1)
  jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.java:182)
  jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.java:149)
  jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.java:207)
  jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.execute(jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.java:177)
  jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:282)
  jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:251)
  jdk.internal.net.http.Http1AsyncReceiver.onReadError(jdk.internal.net.http.Http1AsyncReceiver.java:516)
  jdk.internal.net.http.Http1AsyncReceiver.lambda$handlePendingDelegate$3(jdk.internal.net.http.Http1AsyncReceiver.java:380)
  jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.java:-1) …
Run Code Online (Sandbox Code Playgroud)

java java-http-client project-loom virtual-threads java-21

19
推荐指数
1
解决办法
2406
查看次数

用于从 HTTP 响应读取 InputStream 的虚拟线程

使用 java 21,只需在虚拟线程中执行即可将阻塞 IO 代码转换为非阻塞代码。

我应该简单地包装返回 an 的 HTTP 调用InputStream(如 method 中),还是在虚拟线程中nonBlockingA执行 the 的读取和反序列化(如 method 中)会更有效?InputStreamnonBlockingB

也就是说,读取是InputStream阻塞IO操作吗?

请记住,响应可能非常大,可能包含超过 500,000 个字符串。我也不确定所使用的库是否使用任何 ThreadLocals,这不推荐用于虚拟线程


@SuppressWarnings("unchecked")
class Request {
    private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
    private CloseableHttpClient httpApacheClient;

    List<String> nonBlockingA() throws Exception {
        InputStream bigInputStream = executorService.submit(this::getResponse).get();
        return deserialize(bigInputStream);
    }

    List<String> nonBlockingB() throws Exception {
        return executorService.submit(() -> {
            InputStream bigInputStream = getResponse();
            return deserialize(bigInputStream);
        }).get();
    }

    private InputStream getResponse() throws IOException {
        return httpApacheClient.execute(new HttpGet("http://random/names/size/500000")).getEntity().getContent(); …
Run Code Online (Sandbox Code Playgroud)

java inputstream nonblocking thread-local java-21

4
推荐指数
1
解决办法
616
查看次数