多个Scala actor为一项任务提供服务

Ral*_*lph 2 concurrency scala simd actor

我需要并行处理多个数据值("SIMD").我可以使用java.util.concurrentAPI(Executors.newFixedThreadPool())使用Future实例以并行方式处理多个值:

import java.util.concurrent.{Executors, Callable}

class ExecutorsTest {
  private class Process(value: Int)
      extends Callable[Int] {
    def call(): Int = {
      // Do some time-consuming task
      value
    }
  }

  val executorService = {
    val threads = Runtime.getRuntime.availableProcessors
    Executors.newFixedThreadPool(threads)
  }

  val processes = for (process <- 1 to 1000) yield new Process(process)

  val futures = executorService.invokeAll(processes)

  // Wait for futures
}
Run Code Online (Sandbox Code Playgroud)

如何使用Actors做同样的事情?我不相信我想把所有进程"提供"给一个actor,因为actor会依次执行它们.

我是否需要使用"调度程序"actor创建多个"处理器"角色,该角色向每个"处理器"角色发送相同数量的进程?

Rex*_*err 10

如果您只是想要点火,那么为什么不使用Scala期货呢?

import scala.actors.Futures._
def example = {
  val answers = (1 to 4).map(x => future {
    Thread.sleep(x*1000)
    println("Slept for "+x)
    x
  })
  val t0 = System.nanoTime
  awaitAll(1000000,answers: _*)  // Number is timeout in ms
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  answers.map(_()).sum
}

scala> example
Slept for 1
Slept for 2
Slept for 3
Slept for 4
4.000 seconds elapsed
res1: Int = 10
Run Code Online (Sandbox Code Playgroud)

基本上,你所做的只是将你想要的代码放在一个future { }块中,它会立即返回一个未来; 应用它来获得答案(它将阻止直到完成),或者使用awaitAll超时等待每个人都完成.


更新:从2.11开始,实现此目的的方法是scala.concurrent.Future.上述代码的翻译是:

import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

def example = {
  val answers = Future.sequence(
    (1 to 4).map(x => Future {
      Thread.sleep(x*1000)
      println("Slept for "+x)
      x
    })
  )
  val t0 = System.nanoTime
  val completed = Await.result(answers, Duration(1000, SECONDS))
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  completed.sum
}
Run Code Online (Sandbox Code Playgroud)