如何防止演员在其他长期演员面前挨饿?

Col*_*lin 5 concurrency scala scala-2.8 actor

这是使用Scala 2.8 Actors.我有一个可以并行化的长期工作.它包括大约65万单位的工作.我将它划分为2600个不同的单独子任务,并为每个子任务创建一个新的actor:

actor {
  val range = (0L to total by limit)
  val latch = new CountDownLatch(range.length)
  range.foreach { offset =>
    actor {
      doExpensiveStuff(offset,limit)
      latch.countDown
    }
  }
  latch.await
}
Run Code Online (Sandbox Code Playgroud)

这种方法效果很好,但总体上需要2 + h才能完成.问题在于,与此同时,我创建的任何其他演员都做正常的任务似乎被最初的2600名演员挨饿,这些演员也耐心地等待他们的时间在一个线程上运行但是等待的时间比任何新的演员要长.一起来.

我怎么能避免这种饥饿呢?

初步想法:

  • 而不是2600名演员,使用一个演员,顺序通过大量的工作.我不喜欢这个,因为我希望通过拆分它来尽快完成这项工作.
  • 而不是2600个演员,使用两个演员,每个演员处理整个工作集的不同一半.这可能会更好,但如果我的机器有8个核心怎么办?我可能想要利用更多.

UPDATE

有些人质疑Actors的使用,特别是因为消息传递能力没有在工作者中使用.我假设Actor是一个非常轻量级的抽象,围绕一个ThreadPool处于或接近相同的性能级别,只需手动编写基于ThreadPool的执行.所以我写了一个基准:

import testing._
import java.util.concurrent._
import actors.Futures._

val count = 100000
val poolSize = 4
val numRuns = 100

val ActorTest = new Benchmark {
  def run = {
    (1 to count).map(i => future {
      i * i
    }).foreach(_())
  }
}

val ThreadPoolTest = new Benchmark {
  def run = {
    val queue = new LinkedBlockingQueue[Runnable]
    val pool = new ThreadPoolExecutor(
          poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
    val latch = new CountDownLatch(count)
    (1 to count).map(i => pool.execute(new Runnable {
      override def run = {
        i * i
        latch.countDown
      }
    }))
    latch.await
  }
}

List(ActorTest,ThreadPoolTest).map { b =>
  b.runBenchmark(numRuns).sum.toDouble / numRuns
}

// List[Double] = List(545.45, 44.35)
Run Code Online (Sandbox Code Playgroud)

我在ActorTest中使用了Future抽象来避免将消息传递回另一个actor以表明工作已完成.我惊讶地发现我的Actor代码慢了10倍.请注意,我还创建了我的ThreadPoolExecutor,其初始池大小用于创建默认的Actor池.

回想起来,似乎我可能过度使用了Actor抽象.我将考虑使用单独的ThreadPools来完成这些独特,昂贵,长时间运行的任务.

Vas*_*iuk 6

不管你有多少演员都有,如果你不明确地配置您的日程安排,所有的人都支持一个单一的叉/加入调度(针对一个线程池运行容量4,如果我没有记错的话).这就是饥饿来自的地方.

  1. 您应该为您的actor池尝试不同的调度程序,以找到显示最佳性能的调度程序(如果您希望使用尽可能多的线程来最大化并行性,请尝试使用ResizableThreadPoolScheduler)
  2. 您需要为庞大的演员池(系统中的其他演员不使用它)设置单独的调度程序
  3. 正如@DaGGeRRz所建议的那样,您可以尝试提供可配置调度程序的Akka框架(例如,工作窃取负载平衡调度程序将事件从繁忙的actor的邮箱移动到空闲的actor)

从注释到默认的Actor实现:

可以将运行时系统配置为使用更大的线程池大小(例如,通过设置 actors.corePoolSizeJVM属性). 可以重写特征的scheduler方法Actor以返回a ResizableThreadPoolScheduler,其调整其线程池的大小以避免由调用任意阻塞方法的actor引起的饥饿.该 actors.enableForkJoinJVM属性可以被设置为false,在这种情况下 ResizableThreadPoolScheduler,使用默认执行的行动者.

另外:scala-lang 上调度程序的一个有趣的线程.

  • Vasil对于线程使用是正确的.我错误地认为由线程块短手形式创建的actor会为每个actor生成一个线程,但正如他所说,它们都是从Scala Actor线程池运行的.删除我的答案,因为瓦西尔更好地涵盖了它. (2认同)