yur*_*ura 41 concurrency multithreading scala
我有50,000个任务,并希望用10个线程执行它们.在Java中我应该创建Executers.threadPool(10)并传递runnable然后等待处理所有.据我所知,Scala对该任务特别有用,但我无法在docs中找到解决方案.
mpi*_*ist 58
最简单的方法是使用scala.concurrent.Future
类和相关的基础结构.该scala.concurrent.future
方法异步评估传递给它的块并立即返回Future[A]
表示异步计算的块.期货可以通过多种非阻塞方式进行操作,包括映射,flatMapping,过滤,恢复错误等.
例如,这是一个创建10个任务的示例,其中每个任务都会休眠任意数量的时间,然后返回传递给它的值的平方.
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
println("Executing task " + i)
Thread.sleep(i * 1000L)
i * i
}
val aggregated: Future[Seq[Int]] = Future.sequence(tasks)
val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)
Run Code Online (Sandbox Code Playgroud)
在这个例子中,我们首先创建一系列单独的异步任务,完成后提供一个int.然后Future.sequence
,我们将这些异步任务组合到一个异步任务中 - 交换类型Future
和位置Seq
.最后,我们在等待结果的同时阻止当前线程长达15秒.在该示例中,我们使用全局执行上下文,该上下文由fork/join线程池支持.对于非平凡的示例,您可能希望使用特定于应用程序ExecutionContext
.
通常,应尽可能避免阻塞.有可用的其他组合子Future
类,它可以帮助程序在异步风格,其中包括onSuccess
,onFailure
,和onComplete
.
另外,考虑调查Akka库,它为Scala和Java提供基于actor的并发性,并与之互操作scala.concurrent
.
这种最简单的方法是使用Scala的Future类,它是Actors框架的子组件.scala.actors.Futures.future方法为传递给它的块创建一个Future.然后,您可以使用scala.actors.Futures.awaitAll等待所有任务完成.
例如,这是一个创建10个任务的示例,其中每个任务都会休眠任意数量的时间,然后返回传递给它的值的平方.
import scala.actors.Futures._
val tasks = for (i <- 1 to 10) yield future {
println("Executing task " + i)
Thread.sleep(i * 1000L)
i * i
}
val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)
Run Code Online (Sandbox Code Playgroud)
Jan*_*anx 16
您想要查看Scala actor库或Akka.Akka具有更清晰的语法,但要么会做到这一点.
因此,您需要创建一个知道如何处理任务的演员池.一个Actor基本上可以是任何带有receive方法的类 - 来自Akka教程(http://doc.akkasource.org/tutorial-chat-server-scala):
class MyActor extends Actor {
def receive = {
case "test" => println("received test")
case _ => println("received unknown message")
}}
val myActor = Actor.actorOf[MyActor]
myActor.start
Run Code Online (Sandbox Code Playgroud)
您将需要创建一个actor实例池,并将您的任务作为消息发送给它们.这里有关于Akka actor池的帖子可能会有所帮助:http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/
在您的情况下,每个任务一个actor可能是合适的(与线程相比,actor非常轻量级,因此您可以在一个VM中拥有很多),或者您可能需要在它们之间进行更复杂的负载平衡.
编辑:使用上面的示例演员,发送消息就像这样简单:
myActor ! "test"
Run Code Online (Sandbox Code Playgroud)
然后,演员将"接收测试"输出到标准输出.
消息可以是任何类型,当与Scala的模式匹配结合使用时,您可以使用强大的模式来构建灵活的并发应用程序.
一般来说,Akka演员将在线程共享方面"做正确的事",而对于OP的需求,我想默认是好的.但是如果需要,可以将actor应该使用的调度程序设置为以下几种类型之一:
* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven
Run Code Online (Sandbox Code Playgroud)
为演员设置调度员是微不足道的:
class MyActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(10)
.setMaxPoolSize(10)
.setKeepAliveTimeInMillis(10000)
.build
}
Run Code Online (Sandbox Code Playgroud)
见http://doc.akkasource.org/dispatchers-scala
通过这种方式,您可以限制线程池大小,但同样,使用默认调度程序的50K Akka actor实例可能会满足原始用例,并且它可以很好地并行化.
这实际上只是触及了Akka可以做的事情的表面.它带来了许多Erlang为Scala语言提供的功能.Actor可以监视其他actor并重新启动它们,从而创建自我修复的应用程序.Akka还提供软件事务内存和许多其他功能.它可以说是Scala的"杀手级应用"或"杀手级框架".
如果你想"用10个线程执行它们",那么使用线程.Scala的演员模型,通常是人们在说Scala对并发性有利时所说的,隐藏了这些细节,所以你不会看到它们.
使用actor或者你拥有的所有东西都是简单的计算,你只需要创建50000个并让它们运行.您可能面临问题,但它们具有不同的性质.
这是另一个类似于mpilquist的响应的答案,但没有弃用的API,包括通过自定义ExecutionContext的线程设置:
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
val numJobs = 50000
var numThreads = 10
// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))
// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
// do something more fancy here
i
}
// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum
Run Code Online (Sandbox Code Playgroud)