0 java threadpoolexecutor java-threads virtual-threads java-21
我想创建在单独的 Java 线程池上运行的虚拟线程池。
这是我试图创建的架构:
这是为了让我能够创建单独的池来在一个 JVM 中运行批处理任务,并利用每个池的 n:m 映射的虚拟线程。因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。每个池只会执行一个特定的任务。每个池将有 N 个虚拟线程。因此,这里的映射将是 2 个 {N VirtualThreads -> 6 Platform Threads} 池。
TLDR,我想限制虚拟线程池可以运行的 PlatformThreads 数量。
我能想到的一件事是,创建线程池,当传入可运行对象时,在 run 方法内我可以创建虚拟线程,但不确定它有多实用,以及我是否会得到我想要的池分区。这种方法的另一个问题是,虚拟线程将仅在一个 java 线程中运行,因此没有 N:M 映射
虚拟线程的发明是为了避免您似乎正在经历的麻烦。
\n并且,虚拟线程被明确记录为不用于池化。就像面巾纸一样,抓一张新的,用完后扔掉。
\n阅读Java JEP,观看Ron Pressler、Alan Bateman、Jos\xc3\xa9 Paumard等的视频,了解虚拟线程技术的目的和本质。
\n你说:
\n\n\n我的用例使用相互依赖的作业(基本上一个作业的输出为队列中的另一个作业提供输入)。
\n
\xe2\x80\xa6 和:
\n\n\n当传入可运行对象时,在 run 方法中我可以创建虚拟线程
\n
彻底转变您的思维:不要创建虚拟线程来运行一堆相关的级联任务,而是只创建一个线程以串行方式完成所有工作。
\n如果您有一系列级联任务,每个任务在前一个任务\xe2\x80\x98s 结果之后拾取,然后只需将所有工作写入单个Runnable
/中Callable
。在一个新的虚拟线程中执行该单个组合任务。让该虚拟线程运行完成。
让我们设计一个简单的演示应用程序。我们有三项相互关联的任务:TaskA
、TaskB
和TaskC
。它们被安置在一起,作为AlphabetTask
。结果是“ABC”,每个字母都是由每个子任务添加的。
class AlphabetTask implements Callable < String >\n{\n private final UUID id = UUID.randomUUID ( );\n\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );\n String a = new TaskA ( ).call ( );\n String b = new TaskB ( a ).call ( );\n String c = new TaskC ( b ).call ( );\n System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );\n return c;\n }\n}\n\nclass TaskA implements Callable < String >\n{\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Running TaskA. " + Instant.now ( ) );\n Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n return "A";\n }\n}\n\nclass TaskB implements Callable < String >\n{\n private final String input;\n\n public TaskB ( final String input )\n {\n this.input = input;\n }\n\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Running TaskB. " + Instant.now ( ) );\n Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n return this.input + "B";\n }\n}\n\nclass TaskC implements Callable < String >\n{\n private final String input;\n\n public TaskC ( final String input )\n {\n this.input = input;\n }\n\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Running TaskC. " + Instant.now ( ) );\n Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n return this.input + "C";\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n我们建立三个AlphabetTask实例。
\nCollection < AlphabetTask > alphabetTasks =\n List.of (\n new AlphabetTask ( ) ,\n new AlphabetTask ( ) ,\n new AlphabetTask ( )\n );\n
Run Code Online (Sandbox Code Playgroud)\n我们将所有这些实例提交给执行器服务。对于这三个线程中的每一个AlphabetTasks
,执行器都会分配一个新的虚拟线程。在每个虚拟线程中,每个子任务都按顺序调用。
请注意,如果任务在一天之内完成,我们可以使用 try-with-resources 语法自动关闭执行程序服务。
\nList < Future < String > > futures = List.of ( );\ntry (\n ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;\n)\n{\n try\n {\n futures = executorService.invokeAll ( alphabetTasks );\n } catch ( InterruptedException e )\n {\n throw new RuntimeException ( e );\n }\n}\n// The try-with-resources blocks here if executor service has any uncompleted tasks.\nfutures.forEach ( stringFuture -> {\n try\n {\n System.out.println ( stringFuture.get ( ) );\n } catch ( InterruptedException | ExecutionException e )\n {\n throw new RuntimeException ( e );\n }\n} );\n\n
Run Code Online (Sandbox Code Playgroud)\n注意:虚拟线程适用于代码涉及阻塞的任务,例如文件 I/O、网络 I/O、数据库调用、等待锁等。不要使用虚拟线程来运行 CPU 密集型任务,例如视频编码(不涉及阻塞)。
\n运行时:
\nStarting AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c 2023-12-03T20:30:03.442091Z\nStarting AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 2023-12-03T20:30:03.442091Z\nStarting AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b 2023-12-03T20:30:03.442091Z\nRunning TaskA. 2023-12-03T20:30:03.452388Z\nRunning TaskA. 2023-12-03T20:30:03.452392Z\nRunning TaskA. 2023-12-03T20:30:03.452383Z\nRunning TaskB. 2023-12-03T20:30:03.556780Z\nRunning TaskB. 2023-12-03T20:30:03.687342Z\nRunning TaskC. 2023-12-03T20:30:03.812744Z\nRunning TaskB. 2023-12-03T20:30:04.108820Z\nRunning TaskC. 2023-12-03T20:30:04.278596Z\nEnding AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b Result: ABC 2023-12-03T20:30:04.310085Z\nRunning TaskC. 2023-12-03T20:30:04.360861Z\nEnding AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c Result: ABC 2023-12-03T20:30:04.624803Z\nEnding AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 Result: ABC 2023-12-03T20:30:04.953132Z\nABC\nABC\nABC\n
Run Code Online (Sandbox Code Playgroud)\n注意:跨线程调用时,控制台上的输出可能不会按时间顺序显示System.out.println
。如果您关心顺序,请包括并检查时间戳。
为了方便您复制粘贴,以下是所有代码,可将其放入单个.java
文件中。
package work.basil.example.threading;\n\nimport java.time.Duration;\nimport java.time.Instant;\nimport java.util.Collection;\nimport java.util.List;\nimport java.util.UUID;\nimport java.util.concurrent.*;\n\npublic class Subtasks\n{\n public static void main ( String[] args )\n {\n Subtasks app = new Subtasks ( );\n app.demo ( );\n }\n\n private void demo ( )\n {\n Collection < AlphabetTask > alphabetTasks =\n List.of (\n new AlphabetTask ( ) ,\n new AlphabetTask ( ) ,\n new AlphabetTask ( )\n );\n List < Future < String > > futures = List.of ( );\n try (\n ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;\n )\n {\n try\n {\n futures = executorService.invokeAll ( alphabetTasks );\n } catch ( InterruptedException e )\n {\n throw new RuntimeException ( e );\n }\n }\n // The try-with-resources blocks here if executor service has any uncompleted tasks.\n futures.forEach ( stringFuture -> {\n try\n {\n System.out.println ( stringFuture.get ( ) );\n } catch ( InterruptedException | ExecutionException e )\n {\n throw new RuntimeException ( e );\n }\n } );\n }\n}\n\nclass AlphabetTask implements Callable < String >\n{\n private final UUID id = UUID.randomUUID ( );\n\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );\n String a = new TaskA ( ).call ( );\n String b = new TaskB ( a ).call ( );\n String c = new TaskC ( b ).call ( );\n System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );\n return c;\n }\n}\n\nclass TaskA implements Callable < String >\n{\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Running TaskA. " + Instant.now ( ) );\n Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n return "A";\n }\n}\n\nclass TaskB implements Callable < String >\n{\n private final String input;\n\n public TaskB ( final String input )\n {\n this.input = input;\n }\n\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Running TaskB. " + Instant.now ( ) );\n Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n return this.input + "B";\n }\n}\n\nclass TaskC implements Callable < String >\n{\n private final String input;\n\n public TaskC ( final String input )\n {\n this.input = input;\n }\n\n @Override\n public String call ( ) throws Exception\n {\n System.out.println ( "Running TaskC. " + Instant.now ( ) );\n Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n return this.input + "C";\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n你说:
\n\n\n因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。
\n
你并没有像你想象的那样拥有那么多的控制权。哪些平台线程在哪个核心上运行、何时运行、持续时间长短完全取决于主机操作系统。并且调度行为根据计算机上的当前负载而随时变化。在任何时候,都可能没有、很少或全部线程正在执行。
\n 归档时间: |
|
查看次数: |
245 次 |
最近记录: |