如何处理超过默认线程数的Java流?

Dio*_*lis 7 java multithreading forkjoinpool java-stream

默认情况下,Java流由公共线程池处理,该线程池使用默认参数构造.正如在另一个问题中所回答的那样,可以通过指定自定义池或通过设置java.util.concurrent.ForkJoinPool.common.parallelism系统参数来调整这些默认值.

但是,我无法通过这两种方法中的任何一种来增加分配给流处理的线程数.例如,请考虑以下程序,该程序处理第一个参数中指定的文件中包含的IP地址列表,并输出已解析的地址.在具有大约13000个唯一IP地址的文件上运行此操作,我看到使用Oracle Java Mission Control只需16个线程.其中只有五名是ForkJoinPool工人.然而,这个特定任务将从更多线程中受益,因为线程大部分时间都在等待DNS响应.所以我的问题是,我怎样才能真正增加使用的线程数?

我在三种环境下尝试过该程序; 这些是操作系统报告的线程数.

  • Java SE Runtime Environment在运行Windows 7:17线程的8核计算机上构建1.8.0_73-b02
  • Java SE运行时环境在运行OS X Darwin 15.2.0的23核机器上构建1.8.0_66-b17:23个线程
  • openjdk版本1.8.0_72在一台运行FreeBSD 11.0的44核机器上:44个线程

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;

/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
    /** Resolve the passed IP address into a name */
    static String addressName(String ipAddress) {
        try {
            return InetAddress.getByName(ipAddress).getHostName();
        } catch (UnknownHostException e) {
            return ipAddress;
        }
    }

    public static void main(String[] args) {
        Path path = Paths.get(args[0]);
        ForkJoinPool fjp = new ForkJoinPool(100);
        try {
            fjp.submit(() -> {
                try {
                    Files.lines(path)
                    .parallel()
                    .map(line -> addressName(line))
                    .forEach(System.out::println);
                } catch (IOException e) {
                    System.err.println("Failed: " + e);
                }
            }).get();
        } catch (Exception e) {
            System.err.println("Failed: " + e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Tag*_*eev 7

您的方法存在两个问题.首先,使用自定义FJP不会更改由流API创建的单个任务的最大数量,因为这是通过以下方式定义:

static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
Run Code Online (Sandbox Code Playgroud)

因此,即使您使用的是自定义池,并行任务的数量也会受到限制commonPoolParallelism * 4.(它实际上不是硬限制,而是目标,但在许多情况下,任务数量等于此数字).

上面的问题可以通过使用java.util.concurrent.ForkJoinPool.common.parallelism系统属性来修复,但是你遇到了另一个问题:Files.lines并行化非常糟糕.有关详情,请参阅此问题.特别是,对于13000个输入线,即使您有100个CPU,最大可能的加速比为3.17x(假设每个线处理的时间大致相同).我的StreamEx为此提供了解决方法(使用创建流StreamEx.ofLines(path).parallel()).另一种可能的解决方案是顺序读取文件行List,然后从中创建并行流:

Files.readAllLines(path).parallelStream()...
Run Code Online (Sandbox Code Playgroud)

这将与系统属性一起使用.但是,通常,当任务涉及I/O时,Stream API不适合并行处理.更灵活的解决方案是CompletableFuture用于每一行:

ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
    .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
    .collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
    .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这样,您无需调整系统属性,也可以使用单独的池来执行单独的任务.