pyt*_*nic 1 scala bigdata apache-spark
所以,我有一部分代码如下所示.
for(a <- 0 until segments)
{
// do something with elements at index a
}
Run Code Online (Sandbox Code Playgroud)
什么是最简单的多线程类型,它允许我在一个单独的线程中执行该循环的每个元素.我还有每个线程需要访问的全局集合(由a索引).我在互联网上看到的只是期货和演员,但它们很难掌握.我想要一些像OpenMP一样简单的东西.
我试过以下,但它给了我错误,';' expected but '.' found. }} thread.start.
for (a <- 0 until segments) {
val thread = new Thread {
override def run {
// do something with elements at index a
}} thread.start
}
Run Code Online (Sandbox Code Playgroud)
我认为你正在执行代码只是为了副作用.
以下代码有效.您收到错误,因为它thread.start应该在一个单独的行中.
for (a <- 0 until 10) {
val thread = new Thread {
override def run(): Unit = {
// code
}
}
thread.start()
}
Run Code Online (Sandbox Code Playgroud)
您可以使用期货替换它,以使用比产生可能大量线程更好的线程池.
for (a <- 0 until 10) {
Future {
// code
}
}
Run Code Online (Sandbox Code Playgroud)
但无论哪种情况,你都无法等到它完成.您可以遍历范围并返回Future包含您使用第二个函数开始的每个期货的所有部分结果的列表.
val result: Future[List[Unit]] = Future.traverse((0 until 10).toList)(index => Future {
// code
})
Run Code Online (Sandbox Code Playgroud)
有Future结果你可以等待它,即阻塞调用线程直到所有计算完成.
import scala.concurrent.duration._
import scala.concurrent.Await
Await.ready(result, 1.hour)
Run Code Online (Sandbox Code Playgroud)
使用Await.result得到的结果,Await.ready只是等待,直到将来完成.
要等待线程,您需要在启动它们时构建一个列表,然后join在每个线程上调用.
我认为你不需要改变现有的代码来使用并行集合,但我可能不知道你的代码特有的东西.以下代码将范围拆分为将并行处理的块.
for (a <- (0 until 10).par) {
// code
}
Run Code Online (Sandbox Code Playgroud)
使用并行集合,您无需等待,线程将被阻止,直到处理完所有内容.
| 归档时间: |
|
| 查看次数: |
1482 次 |
| 最近记录: |