Nis*_*ade 5 java multithreading java-8 completable-future
Java版本:11
我有对象列表,我想对它们执行某些操作,其中一个操作取决于另一个操作的结果。为了以异步方式实现此工作流程,我正在使用CompletableFuture.
目前,我正在通过将列表分区为子列表并将每个子列表赋予 来实现此目的CompletableFuture,因此线程池中的每个线程都可以开始处理该子列表。
我使用和工作的上述方法的代码是:
List<SomeObject> someObjectList // original list
List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);
partitionList.forEach(subList -> {
CompletableFuture.supplyAsync(() -> firstOperation(subList), executorService)
.thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});
public static List<String> firstOperation(List<SomeObject> subList){
//perform operation
return List<String>;
}
public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
//perform operation
//print results.
}
Run Code Online (Sandbox Code Playgroud)
这里的问题是对原始列表进行分区,
因为如果我的原始列表有 10 万条记录,分区大小为 100(这意味着我希望每个子列表中有 100 个项目),我将有 1000 个子列表对象,每个对象包含 100 个元素,考虑到内存中有这么多对象,这可能不太好,此外,如果分区大小是用户控制/配置控制器,则较小的分区大小将导致子列表对象数量过多。
而不是对原始列表进行分区,
我知道 SO 不是精确解决方案的地方,但如果你能把我推向正确的方向,一个伪代码,或者即使这CompletableFuture首先是可能的,也会有很大的帮助:)
由于ListUtils.partition不是标准方法,因此无法说出它是如何工作的。但如果它以聪明的方式\xe2\x80\x9c工作\xe2\x80\x9d,它将subList在原始列表上使用,而不是复制数据。
如果你想安全起见,你可以自己进行简单的分区:
\nIntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)\n .forEach(p -> {\n int start = p * partionSize,\n end = Math.min(someObjectList.size(), start + partionSize);\n List<SomeObject> subList = someObjectList.subList(start, end);\n CompletableFuture.supplyAsync(() -> firstOperation(subList), executorService)\n .thenAcceptAsync(r -> secondOperationWithFirstOpResult(r), executorService);\n });\nRun Code Online (Sandbox Code Playgroud)\nCompletableFuture例如,由于这些子列表不存储元素,因此它们比实例消耗更少的内存。所以这没什么好担心的。
但是如果您可以接受使用默认线程池 \xc2\xb9 而不是executorService,则可以使用
IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)\n .parallel()\n .forEach(p -> {\n int start = p * partionSize,\n end = Math.min(someObjectList.size(), start + partionSize);\n List<SomeObject> subList = someObjectList.subList(start, end);\n secondOperationWithFirstOpResult(firstOperation(subList));\n });\nRun Code Online (Sandbox Code Playgroud)\n其中每个子列表仅在处理时存在。
\n这已经在幕后使用了 Fork/Join 任务。无需自己实现这些 Fork/Join 操作。
\n\xc2\xb9 默认池未指定,但将ForkJoinPool.commonPool()在实践中会指定。