Col*_*lin 5 concurrency scala scala-2.8 actor
这是使用Scala 2.8 Actors.我有一个可以并行化的长期工作.它包括大约65万单位的工作.我将它划分为2600个不同的单独子任务,并为每个子任务创建一个新的actor:
actor {
val range = (0L to total by limit)
val latch = new CountDownLatch(range.length)
range.foreach { offset =>
actor {
doExpensiveStuff(offset,limit)
latch.countDown
}
}
latch.await
}
Run Code Online (Sandbox Code Playgroud)
这种方法效果很好,但总体上需要2 + h才能完成.问题在于,与此同时,我创建的任何其他演员都做正常的任务似乎被最初的2600名演员挨饿,这些演员也耐心地等待他们的时间在一个线程上运行但是等待的时间比任何新的演员要长.一起来.
我怎么能避免这种饥饿呢?
初步想法:
UPDATE
有些人质疑Actors的使用,特别是因为消息传递能力没有在工作者中使用.我假设Actor是一个非常轻量级的抽象,围绕一个ThreadPool处于或接近相同的性能级别,只需手动编写基于ThreadPool的执行.所以我写了一个基准:
import testing._
import java.util.concurrent._
import actors.Futures._
val count = 100000
val poolSize = 4
val numRuns = 100
val ActorTest = new Benchmark {
def run = {
(1 to count).map(i => future {
i * i
}).foreach(_())
}
}
val ThreadPoolTest = new Benchmark {
def run = {
val queue = new LinkedBlockingQueue[Runnable]
val pool = new ThreadPoolExecutor(
poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
val latch = new CountDownLatch(count)
(1 to count).map(i => pool.execute(new Runnable {
override def run = {
i * i
latch.countDown
}
}))
latch.await
}
}
List(ActorTest,ThreadPoolTest).map { b =>
b.runBenchmark(numRuns).sum.toDouble / numRuns
}
// List[Double] = List(545.45, 44.35)
Run Code Online (Sandbox Code Playgroud)
我在ActorTest中使用了Future抽象来避免将消息传递回另一个actor以表明工作已完成.我惊讶地发现我的Actor代码慢了10倍.请注意,我还创建了我的ThreadPoolExecutor,其初始池大小用于创建默认的Actor池.
回想起来,似乎我可能过度使用了Actor抽象.我将考虑使用单独的ThreadPools来完成这些独特,昂贵,长时间运行的任务.
不管你有多少演员都有,如果你不明确地配置您的日程安排,所有的人都支持一个单一的叉/加入调度(针对一个线程池运行容量4,如果我没有记错的话).这就是饥饿来自的地方.
从注释到默认的Actor实现:
可以将运行时系统配置为使用更大的线程池大小(例如,通过设置
actors.corePoolSizeJVM属性). 可以重写特征的scheduler方法Actor以返回aResizableThreadPoolScheduler,其调整其线程池的大小以避免由调用任意阻塞方法的actor引起的饥饿.该actors.enableForkJoinJVM属性可以被设置为false,在这种情况下ResizableThreadPoolScheduler,使用默认执行的行动者.
| 归档时间: |
|
| 查看次数: |
1622 次 |
| 最近记录: |