我正在使用 Java Corretto 21.0.0.35.1 build 21+35-LTS和内置 Java HTTP 客户端来检索InputStream. 我正在使用虚拟线程发出并行请求,并且在大多数情况下,它运行良好。然而,有时,我的测试会遇到“固定”事件,如下面的堆栈跟踪所示。
我相信 JDK 已经更新为完全支持虚拟线程,并且根据我的理解,HTTP 客户端根本不应该固定承载线程。但是,似乎在读取并(自动)关闭InputStream.
这是预期的行为吗?或者它仍然是 JDK 中的一个错误吗?
代码:
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream responseBody = response.body()) {
return parser.parse(responseBody); // LINE 52 in the trace below
}
Run Code Online (Sandbox Code Playgroud)
踪迹
* Pinning event captured:
java.lang.VirtualThread.parkOnCarrierThread(java.lang.VirtualThread.java:687)
java.lang.VirtualThread.park(java.lang.VirtualThread.java:603)
java.lang.System$2.parkVirtualThread(java.lang.System$2.java:2639)
jdk.internal.misc.VirtualThreads.park(jdk.internal.misc.VirtualThreads.java:54)
java.util.concurrent.locks.LockSupport.park(java.util.concurrent.locks.LockSupport.java:219)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:754)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:990)
java.util.concurrent.locks.ReentrantLock$Sync.lock(java.util.concurrent.locks.ReentrantLock$Sync.java:153)
java.util.concurrent.locks.ReentrantLock.lock(java.util.concurrent.locks.ReentrantLock.java:322)
sun.nio.ch.SocketChannelImpl.implCloseNonBlockingMode(sun.nio.ch.SocketChannelImpl.java:1091)
sun.nio.ch.SocketChannelImpl.implCloseSelectableChannel(sun.nio.ch.SocketChannelImpl.java:1124)
java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.nio.channels.spi.AbstractSelectableChannel.java:258)
java.nio.channels.spi.AbstractInterruptibleChannel.close(java.nio.channels.spi.AbstractInterruptibleChannel.java:113)
jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:427)
jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:406)
jdk.internal.net.http.Http1Response.lambda$readBody$1(jdk.internal.net.http.Http1Response.java:355)
jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.accept(jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.java:-1)
jdk.internal.net.http.ResponseContent$ChunkedBodyParser.onError(jdk.internal.net.http.ResponseContent$ChunkedBodyParser.java:185)
jdk.internal.net.http.Http1Response$BodyReader.onReadError(jdk.internal.net.http.Http1Response$BodyReader.java:677)
jdk.internal.net.http.Http1AsyncReceiver.checkForErrors(jdk.internal.net.http.Http1AsyncReceiver.java:302)
jdk.internal.net.http.Http1AsyncReceiver.flush(jdk.internal.net.http.Http1AsyncReceiver.java:268)
jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.java:-1)
jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.java:182)
jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.java:149)
jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.java:207)
jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.execute(jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.java:177)
jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:282)
jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:251)
jdk.internal.net.http.Http1AsyncReceiver.onReadError(jdk.internal.net.http.Http1AsyncReceiver.java:516)
jdk.internal.net.http.Http1AsyncReceiver.lambda$handlePendingDelegate$3(jdk.internal.net.http.Http1AsyncReceiver.java:380)
jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.java:-1) …Run Code Online (Sandbox Code Playgroud) 使用 java 21,只需在虚拟线程中执行即可将阻塞 IO 代码转换为非阻塞代码。
我应该简单地包装返回 an 的 HTTP 调用InputStream(如 method 中),还是在虚拟线程中nonBlockingA执行 the 的读取和反序列化(如 method 中)会更有效?InputStreamnonBlockingB
也就是说,读取是InputStream阻塞IO操作吗?
请记住,响应可能非常大,可能包含超过 500,000 个字符串。我也不确定所使用的库是否使用任何 ThreadLocals,这不推荐用于虚拟线程
@SuppressWarnings("unchecked")
class Request {
private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
private CloseableHttpClient httpApacheClient;
List<String> nonBlockingA() throws Exception {
InputStream bigInputStream = executorService.submit(this::getResponse).get();
return deserialize(bigInputStream);
}
List<String> nonBlockingB() throws Exception {
return executorService.submit(() -> {
InputStream bigInputStream = getResponse();
return deserialize(bigInputStream);
}).get();
}
private InputStream getResponse() throws IOException {
return httpApacheClient.execute(new HttpGet("http://random/names/size/500000")).getEntity().getContent(); …Run Code Online (Sandbox Code Playgroud)