Ale*_*x R 5 java parallel-processing java-8 java-stream
假设我有一个Stream<Callable<SomeClass>> stream;.该流正在访问超过一百万个不适合内存的对象.
将此转换为a的惯用方法是什么Stream<SomeClass>,以确保在Callable::call传递给非线程安全的消费者(可能通过调用.sequential().forEach()或其他一些瓶颈机制)之前并行执行?
即并行处理流,但顺序传递输出(随机顺序ok,只要它是单线程).
我知道我可以通过在原始流和消费者之间建立一个ExecutionService和一个来做我想做的事Queue.但这似乎是很多代码,是否有一个神奇的单行程?
其他答案都不适合我。
我最终决定了这样的事情(伪代码):
ExecutorService executor = Executors.newWorkStealingPool();
CompletionService completor = new CompletionService(executor);
int count = stream.map(completor::submit).count();
while(count-- > 0) {
SomeClass obj = completor.take();
consume(obj);
}
Run Code Online (Sandbox Code Playgroud)
该consume(obj)循环在单个线程中顺序执行,而各个可调用任务通过 CompletionService 的多个线程异步工作。内存消耗是有限的,因为CompletionService一次正在进行的项目数量与可用线程的数量相同。等待执行的 Callable 会急切地从流中具体化,但与开始执行后每个消耗的内存相比,其影响可以忽略不计(您的用例可能会有所不同)。