Dis*_*ive 7 java parallel-processing concurrency java-8 java-stream
如果输入大小太小,库会自动序列化流中地图的执行,但这种自动化不会,也不能考虑地图操作的重要程度.有没有办法强制parallelStream()实际并行化CPU 重图?
Hol*_*ger 13
似乎存在一个根本性的误解.链接的问答讨论了由于OP没有看到预期的加速,流显然不能并行工作.结论是,如果工作负载太小,并行处理没有任何好处,而不是自动回退到顺序执行.
实际上恰恰相反.如果您请求并行,即使它实际上降低了性能,您也会得到并行.在这种情况下,实现不会切换到可能更有效的顺序执行.
因此,如果您确信每个元素的工作负载足够高,无论元素数量少,都可以证明并行执行的使用是合理的,您可以简单地请求并行执行.
可以很容易地证明:
Stream.of(1, 2).parallel()
      .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
      .forEach(System.out::println);
在Ideone上,它会打印出来
processing 2 in Thread[main,5,main]
2
processing 1 in Thread[ForkJoinPool.commonPool-worker-1,5,main]
1
但邮件和详细信息的顺序可能会有所不同.甚至可能在某些环境中,两个任务都可能碰巧由同一个线程执行,如果它可以在另一个线程开始拾取它之前完成第二个任务.但是,当然,如果任务足够昂贵,这种情况就不会发生.重要的一点是,整体工作负载已被拆分并入队,可能被其他工作线程拾取.
如果您的环境中针对上述简单示例执行单个线程的执行,您可以插入模拟工作负载,如下所示:
Stream.of(1, 2).parallel()
      .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
      .map(x -> {
           LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(3));
           return x;
        })
      .forEach(System.out::println);
然后,如果" 每个元素的处理时间 "足够高,您还可能会看到总执行时间将短于" 元素数量 "×" 每个元素的处理时间 ".
更新:误解可能是由于Brian Goetz的误导性陈述:"在您的情况下,您的输入集太小而无法分解".
必须强调的是,这不是Stream API的一般属性,而是Map已使用的属性.A HashMap有一个支持数组,条目根据它们的哈希码分布在该数组中.可能的情况是,将数组拆分为n个范围不会导致所包含元素的平衡拆分,尤其是如果只有两个.的实现者HashMap的Spliterator视为搜索数组元素得到完美的平衡拆分是过于昂贵,而不是分裂两种元素是不值得的.
由于HashMap默认容量是16,而且示例只有两个元素,我们可以说地图超大了.简单地修复它也会修复这个例子:
long start = System.nanoTime();
Map<String, Supplier<String>> input = new HashMap<>(2);
input.put("1", () -> {
    System.out.println(Thread.currentThread());
    LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
    return "a";
});
input.put("2", () -> {
    System.out.println(Thread.currentThread());
    LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
    return "b";
});
Map<String, String> results = input.keySet()
        .parallelStream().collect(Collectors.toConcurrentMap(
    key -> key,
    key -> input.get(key).get()));
System.out.println("Time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start));
在我的机器上,它打印
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Time: 2058
结论是,无论输入大小如何,Stream实现总是尝试使用并行执行(如果您请求它).但这取决于输入的结构,工作负载可以分配给工作线程的程度.事情可能更糟,例如,如果您从文件中流式传输线条.
如果您认为平衡分裂的好处值得复制步骤的成本,您也可以使用new ArrayList<>(input.keySet()).parallelStream()而不是input.keySet().parallelStream(),因为内部元素的分布ArrayList总是允许一个完全平衡的分割.