在使用 Kotlin 进行编码时,是选择其中一种更好还是另一种更好?
该视频:Java 21 新功能:虚拟线程 #RoadTo21似乎不赞成将虚拟线程用于非 IO 或非阻塞任务。
我什至在 Kotlin 代码中为 CPU 密集型任务创建了左右协程。这已经不行了吗?
我正在使用 Java Corretto 21.0.0.35.1 build 21+35-LTS和内置 Java HTTP 客户端来检索InputStream
. 我正在使用虚拟线程发出并行请求,并且在大多数情况下,它运行良好。然而,有时,我的测试会遇到“固定”事件,如下面的堆栈跟踪所示。
我相信 JDK 已经更新为完全支持虚拟线程,并且根据我的理解,HTTP 客户端根本不应该固定承载线程。但是,似乎在读取并(自动)关闭InputStream
.
这是预期的行为吗?或者它仍然是 JDK 中的一个错误吗?
代码:
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream responseBody = response.body()) {
return parser.parse(responseBody); // LINE 52 in the trace below
}
Run Code Online (Sandbox Code Playgroud)
踪迹
* Pinning event captured:
java.lang.VirtualThread.parkOnCarrierThread(java.lang.VirtualThread.java:687)
java.lang.VirtualThread.park(java.lang.VirtualThread.java:603)
java.lang.System$2.parkVirtualThread(java.lang.System$2.java:2639)
jdk.internal.misc.VirtualThreads.park(jdk.internal.misc.VirtualThreads.java:54)
java.util.concurrent.locks.LockSupport.park(java.util.concurrent.locks.LockSupport.java:219)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:754)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:990)
java.util.concurrent.locks.ReentrantLock$Sync.lock(java.util.concurrent.locks.ReentrantLock$Sync.java:153)
java.util.concurrent.locks.ReentrantLock.lock(java.util.concurrent.locks.ReentrantLock.java:322)
sun.nio.ch.SocketChannelImpl.implCloseNonBlockingMode(sun.nio.ch.SocketChannelImpl.java:1091)
sun.nio.ch.SocketChannelImpl.implCloseSelectableChannel(sun.nio.ch.SocketChannelImpl.java:1124)
java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.nio.channels.spi.AbstractSelectableChannel.java:258)
java.nio.channels.spi.AbstractInterruptibleChannel.close(java.nio.channels.spi.AbstractInterruptibleChannel.java:113)
jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:427)
jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:406)
jdk.internal.net.http.Http1Response.lambda$readBody$1(jdk.internal.net.http.Http1Response.java:355)
jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.accept(jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.java:-1)
jdk.internal.net.http.ResponseContent$ChunkedBodyParser.onError(jdk.internal.net.http.ResponseContent$ChunkedBodyParser.java:185)
jdk.internal.net.http.Http1Response$BodyReader.onReadError(jdk.internal.net.http.Http1Response$BodyReader.java:677)
jdk.internal.net.http.Http1AsyncReceiver.checkForErrors(jdk.internal.net.http.Http1AsyncReceiver.java:302)
jdk.internal.net.http.Http1AsyncReceiver.flush(jdk.internal.net.http.Http1AsyncReceiver.java:268)
jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.java:-1)
jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.java:182)
jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.java:149)
jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.java:207)
jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.execute(jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.java:177)
jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:282)
jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:251)
jdk.internal.net.http.Http1AsyncReceiver.onReadError(jdk.internal.net.http.Http1AsyncReceiver.java:516)
jdk.internal.net.http.Http1AsyncReceiver.lambda$handlePendingDelegate$3(jdk.internal.net.http.Http1AsyncReceiver.java:380)
jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.java:-1) …
Run Code Online (Sandbox Code Playgroud) JEP-425:虚拟线程指出“应该为每个应用程序任务创建一个新的虚拟线程”,并两次提到在 JVM 中运行“数百万”虚拟线程的可能性。
相同的 JEP 意味着每个虚拟线程都可以访问其自己的线程本地值:
虚拟线程就像平台线程一样支持线程局部变量,因此它们可以运行使用线程局部变量的现有代码。
线程局部变量多次用于缓存非线程安全且创建成本昂贵的对象。JEP 警告:
但是,由于虚拟线程可能非常多,因此请在仔细考虑后使用线程局部变量。
确实很多!特别是考虑到虚拟线程不被池化(或者至少不应该被池化)。作为短期任务的代表,在虚拟线程中使用线程局部变量来缓存昂贵的对象似乎毫无意义。除非!我们可以从虚拟线程创建并访问绑定到其载体线程的线程局部变量
为了澄清起见,我想从这样的事情开始(当仅使用上限为池大小的本机线程时,这是完全可以接受的,但是当连续运行数百万个虚拟线程时,这显然不再是一个非常有效的缓存机制重新创建:
static final ThreadLocal<DateFormat> CACHED = ThreadLocal.withInitial(DateFormat::getInstance);
Run Code Online (Sandbox Code Playgroud)
为此(可惜这个类不是公共 API 的一部分):
static final ThreadLocal<DateFormat> CACHED = new jdk.internal.misc.CarrierThreadLocal();
// CACHED.set(...)
Run Code Online (Sandbox Code Playgroud)
在我们到达那里之前。人们必须问,这是一种安全的做法吗?
好吧,据我正确理解虚拟线程,它们只是在平台线程(又名“载体线程”)上执行的逻辑阶段,能够卸载而不是被阻塞等待。所以我假设 - 如果我错了,请纠正我 - 1)虚拟线程永远不会被同一载体线程上的另一个虚拟线程交错或重新安排在另一个载体线程上,除非代码否则会阻塞,因此,如果 2 )我们对缓存对象调用的操作永远不会阻塞,那么任务/虚拟线程将简单地在同一载体上从头到尾运行,所以是的,将对象缓存在平台线程本地上是安全的。
冒着回答我自己的问题的风险,JEP-425 表明这是不可能的:
载体的线程局部变量对于虚拟线程不可用,反之亦然。
我找不到公共 API 来获取载体线程或在平台线程上显式分配线程局部变量(从虚拟线程),但这并不是说我的研究是确定的。也许有办法吗?
然后我读了JEP-429: Scoped Values,乍一看似乎是 Java 之神试图完全摆脱它ThreadLocal
,或者至少为虚拟线程提供替代方案。事实上,JEP 使用了诸如“迁移到作用域值”之类的措辞,并表示它们“优于线程局部变量,尤其是在使用大量虚拟线程时”。
对于 JEP 中讨论的所有用例,我只能同意。但在本文档的底部,我们还发现了这一点:
有一些场景有利于线程局部变量。一个示例是缓存创建和使用成本高昂的对象,例如 java.text.DateFormat 的实例。众所周知,DateFormat 对象是可变的,因此如果没有同步,就无法在线程之间共享它。通过在线程生命周期内持续存在的线程局部变量为每个线程提供自己的 DateFormat 对象通常是一种实用的方法。
根据前面讨论的内容,使用线程本地可能是“实用的”,但不是很理想。事实上,JEP-429 本身实际上是从一个非常有说服力的评论开始的:“如果一百万个虚拟线程中的每一个都有可变的线程局部变量,那么内存占用可能会很大”。
总结一下:
您是否找到了从虚拟线程在载体线程上分配线程局部变量的方法?
如果不是,那么可以肯定地说,对于使用虚拟线程的应用程序,在线程本地缓存对象的做法已经死亡,并且必须实现/使用不同的方法,例如并发缓存/映射/池/其他方法?
我正在使用 Java 19。我尝试使用新引入的虚拟线程,如下所示:
public static void main(String[] args) {
System.out.println("Started with virutal threads");
try (ExecutorService virtualService = Executors.newVirtualThreadPerTaskExecutor()) {
virtualService.submit(() -> System.out.println("[" + Thread.currentThread().getName() + "] virtual task 1"));
virtualService.submit(() -> System.out.println("[" + Thread.currentThread().getName() + "] virtual task 2"));
}
System.out.println("Finished");
}
Run Code Online (Sandbox Code Playgroud)
该程序的输出是:
Started with virutal threads
[] virtual task 2
[] virtual task 1
Finished
Run Code Online (Sandbox Code Playgroud)
为什么 Thread.currentThread().getName() 没有任何名称?
后续问题:如何识别彼此之间的虚拟线程?(如何识别它们)所以输出看起来像
[thread-1] virtual task 2
[thread-0] virtual task 1
Run Code Online (Sandbox Code Playgroud) JDK 开发人员建议永远不要池化虚拟线程,因为创建和销毁虚拟线程的成本非常低。我对池的想法有点困惑,因为池通常意味着两件事:
我知道 JDK 开发人员希望我们永远不要重用虚拟线程,而生命周期问题让我感到困惑,因为如果有多个虚拟线程的生命周期与应用程序本身一样长,那么听起来可能像是没有重用的池化。
那么,虚拟线程是否应该快速死亡,或者具有较短的有界生命周期,或者多个虚拟线程阻塞、偶尔被唤醒以处理某些任务并且具有非常长的生命周期是否可以?
我偶然发现了虚拟线程和 ConcurrentHashMap 的问题。
正如下面的代码所示,如果您在 中输入锁定computeIfAbsent
,VT 可能永远不会唤醒。这实际上取决于 VT 在载体线程上的调度方式,但我可以轻松地重现这一点。
我知道 ConcurrentHashMap 使用synchronized
,而 VT 不能很好地使用synchronized
,但没有地方说要避免ConcurrentHashMap
,因为那会非常令人失望。
class Scratch {
public static void main(String[] args) throws InterruptedException {
var lock = new ReentrantLock();
var map = new ConcurrentHashMap<Integer, String>();
var callables = new ArrayList<Callable<Void>>();
for (int i = 0; i < 32 /* more than threads in FJ pool */; i++) {
callables.add(() -> {
System.out.println("Spawned thread " + Thread.currentThread().threadId());
map.computeIfAbsent(1, k -> {
System.out.println("on lock() …
Run Code Online (Sandbox Code Playgroud) 我确实知道平台线程很昂贵,因为它需要更多内存并且容易发生 CPU 上下文切换。
但是,在虚拟线程的情况下,少数平台线程可以服务难以想象的大量虚拟线程,虚拟线程是否仍然需要内存空间来钝化上下文/堆栈,然后将其附加到载体线程?
它对记忆有何影响?
为什么自旋 10000 个虚拟线程不会因内存不足而消亡,而 10000 个平台线程则会因内存不足而消亡?
他们都需要相同的堆栈吗?以及需要维护应用程序相关信息的上下文,对吧?
内存中是否存在仅适用于平台线程的额外开销,这就是我们说虚拟线程在内存中“更轻”的原因?如果是的话,是什么造成了这种差异?
在典型的 servlet 环境中,每个请求都有自己的线程。添加日志记录 MDC 来为请求生成唯一的请求 ID 可以通过简单的 servlet 过滤器来实现。
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
try {
String requestId = UUID.randomUUID().toString();
HttpServletResponse httpServletResponse = (HttpServletResponse)response;
httpServletResponse.setHeader("requestId", requestId);
MDC.put("requestId", requestId);
chain.doFilter(request, response);
} finally {
MDC.remove("requestId");
}
}
Run Code Online (Sandbox Code Playgroud)
日志记录配置。
<Pattern>%d %-5level %X{requestId} [%t] %C{10}:%L %m%n</Pattern>
Run Code Online (Sandbox Code Playgroud)
样本记录。
2024-02-04 10:29:55,160 INFO 99cd4d64-5d7c-4577-a5d3-cb8d48d1dfd5 [http-nio-8080-exec-6] c.s.q.UserController:65 Deleteing user 'test'
2024-02-04 10:29:55,260 INFO 99cd4d64-5d7c-4577-a5d3-cb8d48d1dfd5 [http-nio-8080-exec-6] c.s.q.UserController:70 Successfully deleted user 'test'
Run Code Online (Sandbox Code Playgroud)
对于 Java 21+ 中的虚拟线程,我的印象是线程可以在等待任何 IO 时自动挂起请求,并且线程可以开始处理其他请求。在这种情况下,当线程开始服务其他请求时,日志记录 MDC 似乎可以“渗入”其他请求日志。如何解决这个问题,以便我可以继续将唯一值添加到每个请求的日志记录语句中?
动机:尝试迁移虚拟线程。
问题:尽管虚拟线程很便宜,但操作系统可能会发现同时堆叠某些进程很可疑,例如在网络上搜索 IP 或端口。
我使用下面的代码来限制线程的创建。TS_NetworkIPUtils TS_NetworkPortUtils
var executor = useVirtualThread
? Executors.newVirtualThreadPerTaskExecutor()
: Executors.newFixedThreadPool(MAX_THREAD_COUNT());
Run Code Online (Sandbox Code Playgroud)
是否可以创建一个执行器服务来创建虚拟线程并同时具有限制功能?
我刚刚将 Spring Boot 应用程序升级到 Java 21。作为其中的一部分,我还进行了更改以使用虚拟线程。无论是在服务 API 请求时还是在使用执行器在内部执行异步操作时。
对于一种用例,由虚拟线程驱动的执行器的性能似乎比ForkJoinPool
由操作系统线程驱动的执行器差。此用例是设置一些 MDC 值并通过 HTTP调用外部系统。
这是我的伪代码:
List<...> ... = executorService.submit(
() -> IntStream.rangeClosed(-from, to)
.mapToObj(i -> ...)
.parallel()
.map(... -> {
try {
service.setSomeThreadLocalString(...);
MDC.put(..., ...);
MDC.put(..., ...);
return service.call(...);
} finally {
service.removeSomeThreadLocalString(...);
MDC.remove(...);
MDC.remove(...);
}
})
.toList())
.get();
Run Code Online (Sandbox Code Playgroud)
其中 ExecutorService 是:
new ForkJoinPool(30)
Executors.newVirtualThreadPerTaskExecutor()
看起来选项 1 的性能比选项 2 好很多。有时它比选项 1 快 100%。我在 Java 21 环境中完成了这个测试。我正在测试 10 个并行执行。其中选项 1 通常需要 800-1000 毫秒,选项 2 通常需要 1500-2000 毫秒。
如果有任何区别,请在 Spring …
如果我创建一个虚拟线程的执行器服务
Executors.newVirtualThreadPerTaskExecutor()
Run Code Online (Sandbox Code Playgroud)
据我所知,每个任务都在虚拟线程上运行。这些虚拟线程通过平台线程进行调度(希望是交错的)。
这里使用了多少个平台线程?我没有看到任何其他 API 来指定我想要的线程数。我在 Macbook M1 Pro 上使用 Java 21 运行它。
我想创建在单独的 Java 线程池上运行的虚拟线程池。
这是我试图创建的架构:
这是为了让我能够创建单独的池来在一个 JVM 中运行批处理任务,并利用每个池的 n:m 映射的虚拟线程。因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。每个池只会执行一个特定的任务。每个池将有 N 个虚拟线程。因此,这里的映射将是 2 个 {N VirtualThreads -> 6 Platform Threads} 池。
TLDR,我想限制虚拟线程池可以运行的 PlatformThreads 数量。
我能想到的一件事是,创建线程池,当传入可运行对象时,在 run 方法内我可以创建虚拟线程,但不确定它有多实用,以及我是否会得到我想要的池分区。这种方法的另一个问题是,虚拟线程将仅在一个 java 线程中运行,因此没有 N:M 映射
java threadpoolexecutor java-threads virtual-threads java-21
我知道应该使用 a 来用新的虚拟线程包装所有任务VirtualThread
,而不是使用 pooling s,因为这并不能真正带来好处。Executors.newVirtualThreadPerTaskExecutor()
我想知道是否应该阻止PlatformThread
使用这样的执行者执行 a ,并且通常Thread
s 是否完全不应该传递给执行者。
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
Runnable platformThread = Thread.ofPlatform().unstarted(someTask);
executor.execute(platformThread);
}
Run Code Online (Sandbox Code Playgroud)
ThreadFactory
由于执行器的原因,以这种方式创建的平台线程将被虚拟线程包装。我试图了解这是否有意义,或者通常Thread
对象是否不应该传递给ExecutorService
s.
java ×13
virtual-threads ×13
java-21 ×6
project-loom ×3
caching ×1
concurrency ×1
deadlock ×1
java-19 ×1
java-threads ×1
kotlin ×1
servlets ×1
thread-local ×1