如何使用 CompletableFuture 对 ArrayList 中的一批元素执行操作?

Nis*_*ade 5 java multithreading java-8 completable-future

Java版本:11

我有对象列表,我想对它们执行某些操作,其中一个操作取决于另一个操作的结果。为了以异步方式实现此工作流程,我正在使用CompletableFuture.

目前,我正在通过将列表分区为子列表并将每个子列表赋予 来实现此目的CompletableFuture,因此线程池中的每个线程都可以开始处理该子列表。

我使用和工作的上述方法的代码是:

List<SomeObject> someObjectList // original list

List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);

partitionList.forEach(subList -> {
    CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)
            .thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});

public static List<String> firstOperation(List<SomeObject> subList){
        //perform operation
        return List<String>;
}

public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
        //perform operation
        //print results.
}

Run Code Online (Sandbox Code Playgroud)

这里的问题是对原始列表进行分区,

因为如果我的原始列表有 10 万条记录,分区大小为 100(这意味着我希望每个子列表中有 100 个项目),我将有 1000 个子列表对象,每个对象包含 100 个元素,考虑到内存中有这么多对象,这可能不太好,此外,如果分区大小是用户控制/配置控制器,则较小的分区大小将导致子列表对象数量过多。

而不是对原始列表进行分区,

  1. 我想要原始列表。(比如说 100 个元素)
  2. 在原始列表上有一个 startIndex 和 endIndex(比如 0 到 9、10 到 19...)
  3. 并将每个批次分配给一个线程,在带有 CompletableFuture 的线程池中
  4. 所以这个线程可以执行这两个操作。

我知道 SO 不是精确解决方案的地方,但如果你能把我推向正确的方向,一个伪代码,或者即使这CompletableFuture首先是可能的,也会有很大的帮助:)

Hol*_*ger 5

由于ListUtils.partition不是标准方法,因此无法说出它是如何工作的。但如果它以聪明的方式\xe2\x80\x9c工作\xe2\x80\x9d,它将subList在原始列表上使用,而不是复制数据。

\n

如果你想安全起见,你可以自己进行简单的分区:

\n
IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)\n    .forEach(p -> {\n        int start = p * partionSize,\n            end = Math.min(someObjectList.size(), start + partionSize);\n        List<SomeObject> subList = someObjectList.subList(start, end);\n        CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)\n            .thenAcceptAsync(r -> secondOperationWithFirstOpResult(r), executorService);\n    });\n
Run Code Online (Sandbox Code Playgroud)\n

CompletableFuture例如,由于这些子列表不存储元素,因此它们比实例消耗更少的内存。所以这没什么好担心的。

\n

但是如果您可以接受使用默认线程池 \xc2\xb9 而不是executorService,则可以使用

\n
IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)\n    .parallel()\n    .forEach(p -> {\n        int start = p * partionSize,\n            end = Math.min(someObjectList.size(), start + partionSize);\n        List<SomeObject> subList = someObjectList.subList(start, end);\n        secondOperationWithFirstOpResult(firstOperation(subList));\n    });\n
Run Code Online (Sandbox Code Playgroud)\n

其中每个子列表仅在处理时存在。

\n

这已经在幕后使用了 Fork/Join 任务。无需自己实现这些 Fork/Join 操作。

\n

\xc2\xb9 默认池未指定,但将ForkJoinPool.commonPool()在实践中会指定。

\n

  • 这只是为了提醒您,有些成本您可能根本没有考虑到,因为它们在大多数情况下可以忽略不计,并且子列表视图甚至低于该栏。一个可完成的未来必须跟踪其完成状态和相关操作,然后,实际作业被封装在提交给执行器服务的另一个对象中。总的来说,这并不多,只是子列表更少(在大多数情况下,它实际上取决于派生它的特定列表实现)。 (2认同)