如何使用actor重写具有共享依赖关系的for循环

Tho*_*nne 5 scala actor akka

我们有一些代码需要运行得更快.它已经分析了,所以我们想要使用多个线程.通常我会设置一个内存队列,并有许多线程从事队列的工作并计算结果.对于共享数据,我将使用ConcurrentHashMap或类似的.

我真的不想再去那条路了.从我所看到的使用演员将导致更清晰的代码,如果我使用akka迁移到超过1 jvm应该更容易.真的吗?

但是,我不知道如何在演员中思考,所以我不知道从哪里开始.

为了更好地了解问题,这里有一些示例代码:

case class Trade(price:Double, volume:Int, stock:String) {
  def value(priceCalculator:PriceCalculator) =
    (priceCalculator.priceFor(stock)-> price)*volume
}
class PriceCalculator {
  def priceFor(stock:String) = {
    Thread.sleep(20)//a slow operation which can be cached
    50.0
  }
}
object ValueTrades {

  def valueAll(trades:List[Trade],
      priceCalculator:PriceCalculator):List[(Trade,Double)] = {
    trades.map { trade => (trade,trade.value(priceCalculator)) }
  }

  def main(args:Array[String]) {
    val trades = List(
      Trade(30.5, 10, "Foo"),
      Trade(30.5, 20, "Foo")
      //usually much longer
    )
    val priceCalculator = new PriceCalculator
    val values = valueAll(trades, priceCalculator)
  }

}
Run Code Online (Sandbox Code Playgroud)

如果有经验的玩家可以建议如何映射演员,我会很感激.

Rex*_*err 2

对于简单的并行化,我会抛出一堆工作来处理,然后等待它们全部返回,我倾向于使用 Futures 模式。

class ActorExample {
  import actors._
  import Actor._
  class Worker(val id: Int) extends Actor {
    def busywork(i0: Int, i1: Int) = {
      var sum,i = i0
      while (i < i1) {
        i += 1
        sum += 42*i
      }
      sum
    }
    def act() { loop { react {
      case (i0:Int,i1:Int) => sender ! busywork(i0,i1)
      case None => exit()
    }}}
  }

  val workforce = (1 to 4).map(i => new Worker(i)).toList

  def parallelFourSums = {
    workforce.foreach(_.start())
    val futures = workforce.map(w => w !! ((w.id,1000000000)) );
    val computed = futures.map(f => f() match {
      case i:Int => i
      case _ => throw new IllegalArgumentException("I wanted an int!")
    })
    workforce.foreach(_ ! None)
    computed
  }

  def serialFourSums = {
    val solo = workforce.head
    workforce.map(w => solo.busywork(w.id,1000000000))
  }

  def timed(f: => List[Int]) = {
    val t0 = System.nanoTime
    val result = f
    val t1 = System.nanoTime
    (result, t1-t0)
  }

  def go {
    val serial = timed( serialFourSums )
    val parallel = timed( parallelFourSums )
    println("Serial result:  " + serial._1)
    println("Parallel result:" + parallel._1)
    printf("Serial took   %.3f seconds\n",serial._2*1e-9)
    printf("Parallel took %.3f seconds\n",parallel._2*1e-9)
  }
}
Run Code Online (Sandbox Code Playgroud)

基本上,这个想法是创建一组工作人员(每个工作负载一个),然后将所有数据扔给他们!这立即回馈了未来。当您尝试读取未来时,发送方会阻塞,直到工作人员实际完成数据处理。

您可以重写上面的代码,以便PriceCalculator扩展ActorvalueAll协调数据的返回。

请注意,传递非不可变数据时必须小心。

不管怎样,在我输入的机器上,如果你运行上面的命令,你会得到:

scala> (new ActorExample).go
Serial result:  List(-1629056553, -1629056636, -1629056761, -1629056928)
Parallel result:List(-1629056553, -1629056636, -1629056761, -1629056928)
Serial took   1.532 seconds
Parallel took 0.443 seconds
Run Code Online (Sandbox Code Playgroud)

(显然,我至少有四个核心;并行时序略有不同,具体取决于哪个工作人员获得什么处理器以及机器上正在发生的其他事情。)