并行运行操作,同时保留有序的增量输出

Bee*_*ope 4 java concurrency java-stream

我有一个缓慢的CPU密集型操作:doWork(int x)使用具有不同值的单个整数参数调用,如下所示:

static String doWork(int x) {
  // do work that depends on i, takes ~60 seconds
  ...
}

public static void main(String args[]) {
  for (int i = 1; i < 100; i++) {
    System.println(doWork(i));
  }
}
Run Code Online (Sandbox Code Playgroud)

每次doWork()调用完成后,结果都会输出到控制台.我想并行化这个 - 所有的doWork()调用都是独立的,不会改变任何共享状态.现在,我可以做到这一点的老办法,瞎搞与ExecutorSeviceFuture.get()等,但我想用流做更干净1.

所以像这样的东西似乎几乎可以工作:

public static void main(String args[]) {
    IntStream.rangeClosed(1, 100).parallel()
        .forEach(i -> System.out.println(doWork(i)));
}
Run Code Online (Sandbox Code Playgroud)

...但问题是我想保留控制台上的输出顺序(doWork(1)应该先行,依此类推).我无法使用forEachOrdered()因为序列化整个操作:只使用一个线程.问题的根源在于forEachOrdered提供了太强有力的保证:一次在一个元素上顺序调用consumer方法.我希望并行调用消费者,但输出是有序的.

所以我应该看一下map -> collect类型习惯用法,我将每次doWork()调用的输出收集到一个字符串中并打印一次:

public static void main(String[] args) {
    System.out.println(IntStream.rangeClosed(1, 100).parallel()
        .mapToObj(Main::doWork).collect(Collectors.joining("\n")));
}
Run Code Online (Sandbox Code Playgroud)

几乎!该collect()方法保持遭遇顺序,因此我的元素是有序的.现在的问题是没有增量输出 - 整个工作必须在任何输出发生之前完成.我真的想保留更新运行到控制台上的行为.

我想我想要某种有序的消费终端操作,这不会强制整个管道被订购.基本上它会像普通收集器一样在内部收集结果,但是当收集当前"最左边"元素时,它会将其传递给消费者 - 因此消费者会看到有序元素流,但所有内容仍然是并行发生的.

那里有什么东西吗?似乎不可能在现有Collector接口上构建它,因为它没有为您提供确定元素顺序的方法.


1 ...也许更有效率,因为在封面下使用fork/join,所以也许我可以利用该框架中内置的一些启发式方法?

shm*_*sel 6

你很亲密.只需结合使用mapforEachOrdered解决方案:

IntStream.rangeClosed(1, 100)
         .parallel()
         .mapToObj(Main::doWork)
         .forEachOrdered(System.out::println);
Run Code Online (Sandbox Code Playgroud)

  • 哈,真棒.有用.我曾经想过它,但不知何故说它不起作用,因为`forEachOrdered`使整个操作串行 - 但它没有,因为它只需要串行调用消费者(`println`),但其他部分可以并行.咄. (2认同)