使用Scala actor执行CPU绑定任务?

Mic*_*ael 7 concurrency scala actor

假设我必须可爱的几个CPU绑定任务.例如,如果我有4个CPU,我可能会创建一个4-5个工作线程的固定大小的线程池,等待队列并将任务放入队列中.在Java中,我可以使用java.util.concurrent(可能ThreadPoolExecutor)实现此机制.

你会如何用Scala演员实现它?

Dav*_*lin 9

所有参与者基本上都是由调度程序执行的线程.调度程序创建一个线程池来执行大致绑定到您的内核数量的actor.这意味着您可以根据需要执行的每个任务创建一个actor,并将其余部分留给Scala:

for(i <- 1 to 20) {
  actor {
    print(i); 
    Thread.sleep(1000);
  }
}
Run Code Online (Sandbox Code Playgroud)

这里的缺点取决于任务的数量,为每个任务创建线程的成本可能非常昂贵,因为线程在Java中并不便宜.

创建一个有界的worker actor池然后通过消息传递将任务分发给它们的简单方法是:

import scala.actors.Actor._

val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
  actor { 
    loop { 
      react { 
        case x: String => println(x) 
      } 
    }
  }
}

for(i <- 1 to 20) {
  val r = (new util.Random).nextInt(numWorkers)
  pool(r) ! "task "+i
}
Run Code Online (Sandbox Code Playgroud)

我们想要创建多个actor的原因是因为一个actor一次只处理一个消息(即任务),所以为你需要创建多个任务获得并行性.

旁注:默认调度程序在I/O绑定任务中变得尤为重要,因为在这种情况下您肯定希望更改线程池的大小.两篇很好的博客文章详细介绍了这一点:探索Scala Actor的调度Scala actor的线程池陷阱.

话虽如此,Akka是一个Actor框架,它为使用Actors的更高级工作流提供工具,这是我在任何实际应用中使用的.这是一个负载平衡(而不是随机)任务执​​行器:

import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}

class TaskHandler extends Actor {
  def receive = {
    case t: Task =>
      // some computationally expensive thing
      t.execute
    case _ => println("default case is required in Akka...")
  }
}

class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
   val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
   val seq = new CyclicIterator(workerPool)
}

val router = actorOf(new TaskRouter(4)).start()

for(i <- 1 to 20) {
  router ! Task(..)
}
Run Code Online (Sandbox Code Playgroud)

您可以使用不同类型的负载平衡(CyclicIterator是循环分配),因此您可以在此处查看文档以获取更多信息.