Java8 - 以并行方式处理Stream <Callable <... >>以非线程安全的消费者的惯用方法?

Ale*_*x R 5 java parallel-processing java-8 java-stream

假设我有一个Stream<Callable<SomeClass>> stream;.该流正在访问超过一百万个不适合内存的对象.

将此转换为a的惯用方法是什么Stream<SomeClass>,以确保在Callable::call传递给非线程安全的消费者(可能通过调用.sequential().forEach()或其他一些瓶颈机制)之前并行执行?

即并行处理流,但顺序传递输出(随机顺序ok,只要它是单线程).

我知道我可以通过在原始流和消费者之间建立一个ExecutionService和一个来做我想做的事Queue.但这似乎是很多代码,是否有一个神奇的单行程?

Ale*_*x R 0

其他答案都不适合我。

我最终决定了这样的事情(伪代码):

ExecutorService executor = Executors.newWorkStealingPool();
CompletionService completor = new CompletionService(executor);
int count = stream.map(completor::submit).count();
while(count-- > 0) {
  SomeClass obj = completor.take();
  consume(obj);
}
Run Code Online (Sandbox Code Playgroud)

consume(obj)循环在单个线程中顺序执行,而各个可调用任务通过 CompletionService 的多个线程异步工作。内存消耗是有限的,因为CompletionService一次正在进行的项目数量与可用线程的数量相同。等待执行的 Callable 会急切地从流中具体化,但与开始执行后每个消耗的内存相比,其影响可以忽略不计(您的用例可能会有所不同)。