Bee*_*ope 4 java concurrency java-stream
我有一个缓慢的CPU密集型操作:doWork(int x)
使用具有不同值的单个整数参数调用,如下所示:
static String doWork(int x) {
// do work that depends on i, takes ~60 seconds
...
}
public static void main(String args[]) {
for (int i = 1; i < 100; i++) {
System.println(doWork(i));
}
}
Run Code Online (Sandbox Code Playgroud)
每次doWork()
调用完成后,结果都会输出到控制台.我想并行化这个 - 所有的doWork()
调用都是独立的,不会改变任何共享状态.现在,我可以做到这一点的老办法,瞎搞与ExecutorSevice
和Future.get()
等,但我想用流做更干净1.
所以像这样的东西似乎几乎可以工作:
public static void main(String args[]) {
IntStream.rangeClosed(1, 100).parallel()
.forEach(i -> System.out.println(doWork(i)));
}
Run Code Online (Sandbox Code Playgroud)
...但问题是我想保留控制台上的输出顺序(doWork(1)
应该先行,依此类推).我无法使用forEachOrdered()
因为序列化整个操作:只使用一个线程.问题的根源在于forEachOrdered
提供了太强有力的保证:一次在一个元素上顺序调用consumer方法.我希望并行调用消费者,但输出是有序的.
所以我应该看一下map -> collect
类型习惯用法,我将每次doWork()
调用的输出收集到一个字符串中并打印一次:
public static void main(String[] args) {
System.out.println(IntStream.rangeClosed(1, 100).parallel()
.mapToObj(Main::doWork).collect(Collectors.joining("\n")));
}
Run Code Online (Sandbox Code Playgroud)
几乎!该collect()
方法保持遭遇顺序,因此我的元素是有序的.现在的问题是没有增量输出 - 整个工作必须在任何输出发生之前完成.我真的想保留更新运行到控制台上的行为.
我想我想要某种有序的消费终端操作,这不会强制整个管道被订购.基本上它会像普通收集器一样在内部收集结果,但是当收集当前"最左边"元素时,它会将其传递给消费者 - 因此消费者会看到有序元素流,但所有内容仍然是并行发生的.
那里有什么东西吗?似乎不可能在现有Collector
接口上构建它,因为它没有为您提供确定元素顺序的方法.
1 ...也许更有效率,因为在封面下使用fork/join,所以也许我可以利用该框架中内置的一些启发式方法?
你很亲密.只需结合使用map
和forEachOrdered
解决方案:
IntStream.rangeClosed(1, 100)
.parallel()
.mapToObj(Main::doWork)
.forEachOrdered(System.out::println);
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
105 次 |
最近记录: |