che*_*gpu 5 parallel-processing concurrency scala
我参加了Parallel Programming 课程,它显示了并行接口:
def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
val ta = taskA
val tb = task {taskB}
(ta, tb.join())
}
Run Code Online (Sandbox Code Playgroud)
以下是错误的:
def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
val ta = taskB
val tb = task {taskB}.join()
(ta, tb)
}
Run Code Online (Sandbox Code Playgroud)
更多界面见https://gist.github.com/ChenZhongPu/fe389d30626626294306264a148bd2aa
它还向我们展示了执行四个任务的正确方法:
def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
val ta = task { taskA }
val tb = task { taskB }
val tc = task { taskC }
val td = taskD
(ta.join(), tb.join(), tc.join(), td)
}
Run Code Online (Sandbox Code Playgroud)
我的问题:如果我不知道提前的任务数量(任务列表),我该如何join
正确调用每个任务?
tasks.map(_.join()) // wrong
Run Code Online (Sandbox Code Playgroud)
编辑
类似的讨论也发生在本周的讨论模块:并行编程
您可以像这样实现该方法:
def parallel[A](tasks: (() => A)*): Seq[A] = {
if (tasks.isEmpty) Nil
else {
val pendingTasks = tasks.tail.map(t => task { t() })
tasks.head() +: pendingTasks.map(_.join())
}
}
Run Code Online (Sandbox Code Playgroud)
(请注意,您不能拥有可变数量的按名称参数- 尽管这可以更改)
然后像这样使用它:
object ParallelUsage {
def main(args: Array[String]) {
val start = System.currentTimeMillis()
// Use a list of tasks:
val tasks = List(longTask _, longTask _, longTask _, longTask _)
val results = parallel(tasks: _*)
println(results)
// or pass any number of individual tasks directly:
println(parallel(longTask, longTask, longTask))
println(parallel(longTask, longTask))
println(parallel(longTask))
println(parallel())
println(s"Done in ${ System.currentTimeMillis() - start } ms")
}
def longTask() = {
println("starting longTask execution")
Thread.sleep(1000)
42 + Math.random
}
}
Run Code Online (Sandbox Code Playgroud)
你不能比这更简单了:
val tasks = Vector(longTask _, longTask _, longTask _)
val results = tasks.par.map(_()).seq
Run Code Online (Sandbox Code Playgroud)