在 Loom 中,我可以使用虚拟线程进行递归[操作/任务]吗?

Hel*_*eat 5 java fork-join project-loom structured-concurrency

例如,是否可以使用 RecursiveAction 与虚拟线程池(而不是 fork/join 池)结合使用(在我尝试设计不良的自定义工作之前)?

Hol*_*ger 3

RecursiveAction是它的一个子类ForkJoinTask,顾名思义,文档甚至从字面上说,是一个

\n
\n

ForkJoinPool.

\n
\n

虽然ForkJoinPool可以使用线程工厂进行定制,但它\xe2\x80\x99并不是标准的线程工厂,而是一个用于生产实例的特殊工厂ForkJoinWorkerThread。由于这些线程是 的子类Thread,因此不能使用虚拟线程工厂创建它们。

\n

RecursiveAction因此,您可以\xe2\x80\x99t与虚拟线程一起使用。这同样适用于RecursiveTask. 但\xe2\x80\x99s值得重新思考使用这些类与虚拟线程会给你带来什么。

\n

无论如何,将任务分解为子任务的主要挑战在于您。这些类为您提供的是专门用于处理 Fork/Join 池以及通过可用平台线程平衡工作负载的功能。当您想在其自己的虚拟线程上执行每个子任务时,您不需要这个。因此,您可以轻松地使用虚拟线程实现递归任务,而无需内置类,例如

\n
record PseudoTask(int from, int to) {\n    public static CompletableFuture<Void> run(int from, int to) {\n        return CompletableFuture.runAsync(\n            new PseudoTask(from, to)::compute, Thread::startVirtualThread);\n    }\n\n    protected void compute() {\n        int mid = (from + to) >>> 1;\n        if(mid == from) {\n            // simulate actual processing with potentially blocking operations\n            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n        }\n        else {\n            CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);\n            sub1.join();\n            sub2.join();\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这个例子只是不关心限制细分也不关心避免阻塞join()调用,并且它在运行时仍然表现良好,例如PseudoTask.run(0, 1_000).join();您可能会注意到,对于更大的范围,从其他递归任务实现中已知的技术在这里也很有用,其中子任务相当便宜。

\n

例如,您可能只将范围的一半提交到另一个线程并在本地处理另一半,例如

\n
record PseudoTask(int from, int to) {\n    public static CompletableFuture<Void> run(int from, int to) {\n        return CompletableFuture.runAsync(\n            new PseudoTask(from, to)::compute, Thread::startVirtualThread);\n    }\n\n    protected void compute() {\n        CompletableFuture<Void> f = null;\n        for(int from = this.from, mid; ; from = mid) {\n            mid = (from + to) >>> 1;\n            if (mid == from) {\n                // simulate actual processing with potentially blocking operations\n                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n                break;\n            } else {\n                CompletableFuture<Void> sub1 = run(from, mid);\n                if(f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);\n            }\n        }\n        if(f != null) f.join();\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这在跑步时会产生显着的差异,例如PseudoTask.run(0, 1_000_000).join();在第二个示例中仅使用 100 万个线程,而不是 200 万个。但是,当然,\xe2\x80\x99 是与平台线程不同级别的讨论,在平台线程中,两种方法都无法合理工作。

\n
\n

另一个即将推出的选项是StructuredTaskScope允许生成子任务并等待其完成

\n
record PseudoTask(int from, int to) {\n    public static void run(int from, int to) {\n        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {\n            new PseudoTask(from, to).compute(scope);\n            scope.join();\n        } catch (InterruptedException e) {\n            throw new IllegalStateException(e);\n        }\n    }\n\n    protected Void compute(StructuredTaskScope<Object> scope) {\n        for(int from = this.from, mid; ; from = mid) {\n            mid = (from + to) >>> 1;\n            if (mid == from) {\n                // simulate actual processing with potentially blocking operations\n                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n                break;\n            } else {\n                var sub = new PseudoTask(from, mid);\n                scope.fork(() -> sub.compute(scope));\n            }\n        }\n        return null;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这里,任务不等待其子任务的完成,而只有根任务等待所有任务的完成。但此功能处于孵化器状态,​​因此可能需要比虚拟线程功能更长的时间才能投入生产。

\n