Mic*_*ael 7 concurrency scala actor
假设我必须可爱的几个CPU绑定任务.例如,如果我有4个CPU,我可能会创建一个4-5个工作线程的固定大小的线程池,等待队列并将任务放入队列中.在Java中,我可以使用java.util.concurrent(可能ThreadPoolExecutor)实现此机制.
你会如何用Scala演员实现它?
所有参与者基本上都是由调度程序执行的线程.调度程序创建一个线程池来执行大致绑定到您的内核数量的actor.这意味着您可以根据需要执行的每个任务创建一个actor,并将其余部分留给Scala:
for(i <- 1 to 20) {
actor {
print(i);
Thread.sleep(1000);
}
}
Run Code Online (Sandbox Code Playgroud)
这里的缺点取决于任务的数量,为每个任务创建线程的成本可能非常昂贵,因为线程在Java中并不便宜.
创建一个有界的worker actor池然后通过消息传递将任务分发给它们的简单方法是:
import scala.actors.Actor._
val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
actor {
loop {
react {
case x: String => println(x)
}
}
}
}
for(i <- 1 to 20) {
val r = (new util.Random).nextInt(numWorkers)
pool(r) ! "task "+i
}
Run Code Online (Sandbox Code Playgroud)
我们想要创建多个actor的原因是因为一个actor一次只处理一个消息(即任务),所以为你需要创建多个任务获得并行性.
旁注:默认调度程序在I/O绑定任务中变得尤为重要,因为在这种情况下您肯定希望更改线程池的大小.两篇很好的博客文章详细介绍了这一点:探索Scala Actor的调度和Scala actor的线程池陷阱.
话虽如此,Akka是一个Actor框架,它为使用Actors的更高级工作流提供工具,这是我在任何实际应用中使用的.这是一个负载平衡(而不是随机)任务执行器:
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}
class TaskHandler extends Actor {
def receive = {
case t: Task =>
// some computationally expensive thing
t.execute
case _ => println("default case is required in Akka...")
}
}
class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
val seq = new CyclicIterator(workerPool)
}
val router = actorOf(new TaskRouter(4)).start()
for(i <- 1 to 20) {
router ! Task(..)
}
Run Code Online (Sandbox Code Playgroud)
您可以使用不同类型的负载平衡(CyclicIterator是循环分配),因此您可以在此处查看文档以获取更多信息.
| 归档时间: |
|
| 查看次数: |
1083 次 |
| 最近记录: |