Java 8:并行流等待所有线程完成任务

Him*_*dav 5 java java-8 java-stream

使用并行流处理具有大量数据的大量文件并将它们写入特定格式。这是代码:

public static void main(String[] args) throws Exception {
   mergeController.compactFiles();
   mergeController.writeMergedFlag();
}
private void compactFiles() {
  Set<String> events = this.listSubDirectoryNames(inputDir);
  events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration));
}
Run Code Online (Sandbox Code Playgroud)

这些方法不返回任何内容,因为它们只是在写入文件。我看到writeMergedFlag()主要是在运行该过程 1.5 小时后被调用。

这里有什么问题?是堆空间问题还是别的什么?我以前没有遇到过这种类型的问题。

Iva*_*van 1

parallelStream()默认情况下,JVM 中的所有内容都使用ForkJoinPool.commonPool()具有numberOfCPUs - 1工作线程的相同内容。因此,在您的情况下,您首先需要使用探查器检查消耗时间的内容,如果只有大量文件需要处理,您可以为并行流使用自定义线程池。

private void compactFiles() throws Exception {
  Set<String> events = this.listSubDirectoryNames(inputDir);
  ForkJoinPool customThreadPool = new ForkJoinPool(4); // you might need to adjust this value to find optimal performance
  customThreadPool.submit(() -> events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration))).get(); //Due to how ForkJoin pool works tasks will be submitted to the same pool which was used to execute parent task
}
Run Code Online (Sandbox Code Playgroud)

http://www.baeldung.com/java-8-parallel-streams-custom-threadpool