为java中的平台线程池创建单独的线程池

0 java threadpoolexecutor java-threads virtual-threads java-21

我想创建在单独的 Java 线程池上运行的虚拟线程池。

这是我试图创建的架构:

建筑学

这是为了让我能够创建单独的池来在一个 JVM 中运行批处理任务,并利用每个池的 n:m 映射的虚拟线程。因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。每个池只会执行一个特定的任务。每个池将有 N 个虚拟线程。因此,这里的映射将是 2 个 {N VirtualThreads -> 6 Platform Threads} 池。

TLDR,我想限制虚拟线程池可以运行的 PlatformThreads 数量。

我能想到的一件事是,创建线程池,当传入可运行对象时,在 run 方法内我可以创建虚拟线程,但不确定它有多实用,以及我是否会得到我想要的池分区。这种方法的另一个问题是,虚拟线程将仅在一个 java 线程中运行,因此没有 N:M 映射

Bas*_*que 7

虚拟线程的发明是为了避免您似乎正在经历的麻烦。

\n

并且,虚拟线程被明确记录为用于池化。就像面巾纸一样,抓一张新的,用完后扔掉。

\n

阅读Java JEP,观看Ron Pressler、Alan Bateman、Jos\xc3\xa9 Paumard等的视频,了解虚拟线程技术的目的和本质。

\n

你说:

\n
\n

我的用例使用相互依赖的作业(基本上一个作业的输出为队列中的另一个作业提供输入)。

\n
\n

\xe2\x80\xa6 和:

\n
\n

当传入可运行对象时,在 run 方法中我可以创建虚拟线程

\n
\n

彻底转变您的思维:不要创建虚拟线程来运行一堆相关的级联任务,而是只创建一个线程以串行方式完成所有工作。

\n

如果您有一系列级联任务,每个任务在前一个任务\xe2\x80\x98s 结果之后拾取,然后只需将所有工作写入单个Runnable/中Callable。在一个新的虚拟线程中执行该单个组合任务。让该虚拟线程运行完成。

\n

让我们设计一个简单的演示应用程序。我们有三项相互关联的任务:TaskATaskBTaskC。它们被安置在一起,作为AlphabetTask。结果是“ABC”,每个字母都是由每个子任务添加的。

\n
class AlphabetTask implements Callable < String >\n{\n    private final UUID id = UUID.randomUUID ( );\n\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );\n        String a = new TaskA ( ).call ( );\n        String b = new TaskB ( a ).call ( );\n        String c = new TaskC ( b ).call ( );\n        System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );\n        return c;\n    }\n}\n\nclass TaskA implements Callable < String >\n{\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Running TaskA. " + Instant.now ( ) );\n        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n        return "A";\n    }\n}\n\nclass TaskB implements Callable < String >\n{\n    private final String input;\n\n    public TaskB ( final String input )\n    {\n        this.input = input;\n    }\n\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Running TaskB. " + Instant.now ( ) );\n        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n        return this.input + "B";\n    }\n}\n\nclass TaskC implements Callable < String >\n{\n    private final String input;\n\n    public TaskC ( final String input )\n    {\n        this.input = input;\n    }\n\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Running TaskC. " + Instant.now ( ) );\n        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n        return this.input + "C";\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

我们建立三个AlphabetTask实例。

\n
Collection < AlphabetTask > alphabetTasks =\n        List.of (\n                new AlphabetTask ( ) ,\n                new AlphabetTask ( ) ,\n                new AlphabetTask ( )\n        );\n
Run Code Online (Sandbox Code Playgroud)\n

我们将所有这些实例提交给执行器服务。对于这三个线程中的每一个AlphabetTasks,执行器都会分配一个新的虚拟线程。在每个虚拟线程中,每个子任务都按顺序调用。

\n

请注意,如果任务在一天之内完成,我们可以使用 try-with-resources 语法自动关闭执行程序服务。

\n
List < Future < String > > futures = List.of ( );\ntry (\n        ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;\n)\n{\n    try\n    {\n        futures = executorService.invokeAll ( alphabetTasks );\n    } catch ( InterruptedException e )\n    {\n        throw new RuntimeException ( e );\n    }\n}\n// The try-with-resources blocks here if executor service has any uncompleted tasks.\nfutures.forEach ( stringFuture -> {\n    try\n    {\n        System.out.println ( stringFuture.get ( ) );\n    } catch ( InterruptedException | ExecutionException e )\n    {\n        throw new RuntimeException ( e );\n    }\n} );\n\n
Run Code Online (Sandbox Code Playgroud)\n

注意:虚拟线程适用于代码涉及阻塞的任务,例如文件 I/O、网络 I/O、数据库调用、等待锁等。不要使用虚拟线程来运行 CPU 密集型任务,例如视频编码(不涉及阻塞)。

\n

运行时:

\n
Starting AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c 2023-12-03T20:30:03.442091Z\nStarting AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 2023-12-03T20:30:03.442091Z\nStarting AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b 2023-12-03T20:30:03.442091Z\nRunning TaskA. 2023-12-03T20:30:03.452388Z\nRunning TaskA. 2023-12-03T20:30:03.452392Z\nRunning TaskA. 2023-12-03T20:30:03.452383Z\nRunning TaskB. 2023-12-03T20:30:03.556780Z\nRunning TaskB. 2023-12-03T20:30:03.687342Z\nRunning TaskC. 2023-12-03T20:30:03.812744Z\nRunning TaskB. 2023-12-03T20:30:04.108820Z\nRunning TaskC. 2023-12-03T20:30:04.278596Z\nEnding AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b Result: ABC 2023-12-03T20:30:04.310085Z\nRunning TaskC. 2023-12-03T20:30:04.360861Z\nEnding AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c Result: ABC 2023-12-03T20:30:04.624803Z\nEnding AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 Result: ABC 2023-12-03T20:30:04.953132Z\nABC\nABC\nABC\n
Run Code Online (Sandbox Code Playgroud)\n

注意:跨线程调用时,控制台上的输出可能不会按时间顺序显示System.out.println。如果您关心顺序,请包括并检查时间戳。

\n
\n

为了方便您复制粘贴,以下是所有代码,可将其放入单个.java文件中。

\n
package work.basil.example.threading;\n\nimport java.time.Duration;\nimport java.time.Instant;\nimport java.util.Collection;\nimport java.util.List;\nimport java.util.UUID;\nimport java.util.concurrent.*;\n\npublic class Subtasks\n{\n    public static void main ( String[] args )\n    {\n        Subtasks app = new Subtasks ( );\n        app.demo ( );\n    }\n\n    private void demo ( )\n    {\n        Collection < AlphabetTask > alphabetTasks =\n                List.of (\n                        new AlphabetTask ( ) ,\n                        new AlphabetTask ( ) ,\n                        new AlphabetTask ( )\n                );\n        List < Future < String > > futures = List.of ( );\n        try (\n                ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;\n        )\n        {\n            try\n            {\n                futures = executorService.invokeAll ( alphabetTasks );\n            } catch ( InterruptedException e )\n            {\n                throw new RuntimeException ( e );\n            }\n        }\n        // The try-with-resources blocks here if executor service has any uncompleted tasks.\n        futures.forEach ( stringFuture -> {\n            try\n            {\n                System.out.println ( stringFuture.get ( ) );\n            } catch ( InterruptedException | ExecutionException e )\n            {\n                throw new RuntimeException ( e );\n            }\n        } );\n    }\n}\n\nclass AlphabetTask implements Callable < String >\n{\n    private final UUID id = UUID.randomUUID ( );\n\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );\n        String a = new TaskA ( ).call ( );\n        String b = new TaskB ( a ).call ( );\n        String c = new TaskC ( b ).call ( );\n        System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );\n        return c;\n    }\n}\n\nclass TaskA implements Callable < String >\n{\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Running TaskA. " + Instant.now ( ) );\n        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n        return "A";\n    }\n}\n\nclass TaskB implements Callable < String >\n{\n    private final String input;\n\n    public TaskB ( final String input )\n    {\n        this.input = input;\n    }\n\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Running TaskB. " + Instant.now ( ) );\n        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n        return this.input + "B";\n    }\n}\n\nclass TaskC implements Callable < String >\n{\n    private final String input;\n\n    public TaskC ( final String input )\n    {\n        this.input = input;\n    }\n\n    @Override\n    public String call ( ) throws Exception\n    {\n        System.out.println ( "Running TaskC. " + Instant.now ( ) );\n        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );\n        return this.input + "C";\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

你说:

\n
\n

因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。

\n
\n

你并没有像你想象的那样拥有那么多的控制权。哪些平台线程在哪个核心上运行、何时运行、持续时间长短完全取决于主机操作系统。并且调度行为根据计算机上的当前负载而随时变化。在任何时候,都可能没有、很少或全部线程正在执行。

\n