我有一个100个运行Actors的actor池,它共享一个工作窃取调度程序,其CorePoolSize设置为100.但是现在当向一个Actors发送19条消息时,19条消息没有并行化到19个Actors,只有5条消息正在运行在平行下.当这5条消息完成后,接下来的5条消息将由这些相同的5个Actors再次处理,依此类推.为什么我的19条消息并行运行,我在这里缺少什么?
我的代码基本上看起来像这样:
object TestActor {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
.setCorePoolSize(100)
.setMaxPoolSize(100)
.build
}
class TestActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.dispatcher = TestActor.dispatcher
def receive = {
case num: Integer => { println("Actor: " + name + " Received: " + num)
Thread.sleep(10000)
}
}
}
trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val testActors: List[ActorRef]
val seq = new CyclicIterator[ActorRef](testActors)
}
trait TestActorManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
val …
Run Code Online (Sandbox Code Playgroud) 我实现了一个简单的作业处理器来处理期货中的子作业(scala.actors.Futures).这些期货本身可以为加工子工作创造更多的未来.现在,如果其中一个子工具抛出异常,我希望作业处理器回复该作业的错误消息.我有一个解决方法来发现失败的subjobs,但我不确定这是否是最好的解决方案.基本上它的工作原理如下:
sealed trait JobResult
case class SuccessResult(content: String) extends JobResult
case class FailedResult(message: String) extends JobResult
for(subjob <- subjobs) yield {
future {
try {
SuccessResult(process(subjob))
} catch {
case e:Exception => FailedResult(e.getMessage)
}
}
}
Run Code Online (Sandbox Code Playgroud)
顶级的结果是JobResults的递归列表列表... 我递归搜索List以查找失败的结果,然后根据结果类型返回错误或组合结果.这是有效的,但我想知道是否有一个更优雅/更容易的解决方案来处理未来的例外?