Hel*_*eat 5 java fork-join project-loom structured-concurrency
例如,是否可以使用 RecursiveAction 与虚拟线程池(而不是 fork/join 池)结合使用(在我尝试设计不良的自定义工作之前)?
RecursiveAction是它的一个子类ForkJoinTask,顾名思义,文档甚至从字面上说,是一个
\n\n\n
虽然ForkJoinPool可以使用线程工厂进行定制,但它\xe2\x80\x99并不是标准的线程工厂,而是一个用于生产实例的特殊工厂ForkJoinWorkerThread。由于这些线程是 的子类Thread,因此不能使用虚拟线程工厂创建它们。
RecursiveAction因此,您可以\xe2\x80\x99t与虚拟线程一起使用。这同样适用于RecursiveTask. 但\xe2\x80\x99s值得重新思考使用这些类与虚拟线程会给你带来什么。
无论如何,将任务分解为子任务的主要挑战在于您。这些类为您提供的是专门用于处理 Fork/Join 池以及通过可用平台线程平衡工作负载的功能。当您想在其自己的虚拟线程上执行每个子任务时,您不需要这个。因此,您可以轻松地使用虚拟线程实现递归任务,而无需内置类,例如
\nrecord PseudoTask(int from, int to) {\n public static CompletableFuture<Void> run(int from, int to) {\n return CompletableFuture.runAsync(\n new PseudoTask(from, to)::compute, Thread::startVirtualThread);\n }\n\n protected void compute() {\n int mid = (from + to) >>> 1;\n if(mid == from) {\n // simulate actual processing with potentially blocking operations\n LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n }\n else {\n CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);\n sub1.join();\n sub2.join();\n }\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n这个例子只是不关心限制细分也不关心避免阻塞join()调用,并且它在运行时仍然表现良好,例如PseudoTask.run(0, 1_000).join();您可能会注意到,对于更大的范围,从其他递归任务实现中已知的技术在这里也很有用,其中子任务相当便宜。
例如,您可能只将范围的一半提交到另一个线程并在本地处理另一半,例如
\nrecord PseudoTask(int from, int to) {\n public static CompletableFuture<Void> run(int from, int to) {\n return CompletableFuture.runAsync(\n new PseudoTask(from, to)::compute, Thread::startVirtualThread);\n }\n\n protected void compute() {\n CompletableFuture<Void> f = null;\n for(int from = this.from, mid; ; from = mid) {\n mid = (from + to) >>> 1;\n if (mid == from) {\n // simulate actual processing with potentially blocking operations\n LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n break;\n } else {\n CompletableFuture<Void> sub1 = run(from, mid);\n if(f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);\n }\n }\n if(f != null) f.join();\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n这在跑步时会产生显着的差异,例如PseudoTask.run(0, 1_000_000).join();在第二个示例中仅使用 100 万个线程,而不是 200 万个。但是,当然,\xe2\x80\x99 是与平台线程不同级别的讨论,在平台线程中,两种方法都无法合理工作。
另一个即将推出的选项是StructuredTaskScope允许生成子任务并等待其完成
record PseudoTask(int from, int to) {\n public static void run(int from, int to) {\n try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {\n new PseudoTask(from, to).compute(scope);\n scope.join();\n } catch (InterruptedException e) {\n throw new IllegalStateException(e);\n }\n }\n\n protected Void compute(StructuredTaskScope<Object> scope) {\n for(int from = this.from, mid; ; from = mid) {\n mid = (from + to) >>> 1;\n if (mid == from) {\n // simulate actual processing with potentially blocking operations\n LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n break;\n } else {\n var sub = new PseudoTask(from, mid);\n scope.fork(() -> sub.compute(scope));\n }\n }\n return null;\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n这里,任务不等待其子任务的完成,而只有根任务等待所有任务的完成。但此功能处于孵化器状态,因此可能需要比虚拟线程功能更长的时间才能投入生产。
\n