scala 延迟并行集合(可能吗?)

dsn*_*ode 5 parallel-processing scala stream

有没有办法在scala中并行运行流而不将所有对象加载到内存中?

注意:使用 par 方法,会将所有对象加载到内存中

val list = "a"::"b"::"c"::"d"::"e"::Nil //> list: List[String] = List(a, b, c, d, e)

val s = list.toStream  //> s: scala.collection.immutable.Stream[String] = Stream(a, ?)
val sq = s.par         //> sq: scala.collection.parallel.immutable.ParSeq[String] = ParVector(a, b, c, d, e)
sq.map { x => println("Map 1 "+x);x }
  .map { x => println("Map 2 "+x);x}
  .map { x => println("Map 3 "+x);x }
  .foreach { x => println("done "+x)}  
Run Code Online (Sandbox Code Playgroud)

Ric*_*ich 4

一般来说,是的,这是可能的。

正如 Tzach Zohar 评论的那样,“.par”运算符将急切地加载 Stream 的所有元素,因为“流本质上是顺序的,因为元素必须一个接一个地访问”(请参阅​​文档

因此,您不能为此使用内置的并行集合,但您仍然可以ExecutionContext直接使用并行处理流,例如:

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

val infStream = Stream.from(1)

val mappedInfStream = infStream
  .map { x => Future(println(s"processing $x on ${Thread.currentThread.getName}")) }

Await.result(
  Future.sequence(mappedInfStream.take(100)),
  Duration.Inf)
Run Code Online (Sandbox Code Playgroud)