Dio*_*lis 7 java multithreading forkjoinpool java-stream
默认情况下,Java流由公共线程池处理,该线程池使用默认参数构造.正如在另一个问题中所回答的那样,可以通过指定自定义池或通过设置java.util.concurrent.ForkJoinPool.common.parallelism系统参数来调整这些默认值.
但是,我无法通过这两种方法中的任何一种来增加分配给流处理的线程数.例如,请考虑以下程序,该程序处理第一个参数中指定的文件中包含的IP地址列表,并输出已解析的地址.在具有大约13000个唯一IP地址的文件上运行此操作,我看到使用Oracle Java Mission Control只需16个线程.其中只有五名是ForkJoinPool工人.然而,这个特定任务将从更多线程中受益,因为线程大部分时间都在等待DNS响应.所以我的问题是,我怎样才能真正增加使用的线程数?
我在三种环境下尝试过该程序; 这些是操作系统报告的线程数.
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return InetAddress.getByName(ipAddress).getHostName();
} catch (UnknownHostException e) {
return ipAddress;
}
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
ForkJoinPool fjp = new ForkJoinPool(100);
try {
fjp.submit(() -> {
try {
Files.lines(path)
.parallel()
.map(line -> addressName(line))
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}).get();
} catch (Exception e) {
System.err.println("Failed: " + e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
您的方法存在两个问题.首先,使用自定义FJP不会更改由流API创建的单个任务的最大数量,因为这是通过以下方式定义的:
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
Run Code Online (Sandbox Code Playgroud)
因此,即使您使用的是自定义池,并行任务的数量也会受到限制commonPoolParallelism * 4.(它实际上不是硬限制,而是目标,但在许多情况下,任务数量等于此数字).
上面的问题可以通过使用java.util.concurrent.ForkJoinPool.common.parallelism系统属性来修复,但是你遇到了另一个问题:Files.lines并行化非常糟糕.有关详情,请参阅此问题.特别是,对于13000个输入线,即使您有100个CPU,最大可能的加速比为3.17x(假设每个线处理的时间大致相同).我的StreamEx库为此提供了解决方法(使用创建流StreamEx.ofLines(path).parallel()).另一种可能的解决方案是顺序读取文件行List,然后从中创建并行流:
Files.readAllLines(path).parallelStream()...
Run Code Online (Sandbox Code Playgroud)
这将与系统属性一起使用.但是,通常,当任务涉及I/O时,Stream API不适合并行处理.更灵活的解决方案是CompletableFuture用于每一行:
ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
.map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
.collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
这样,您无需调整系统属性,也可以使用单独的池来执行单独的任务.
| 归档时间: |
|
| 查看次数: |
2023 次 |
| 最近记录: |