小编roc*_*ady的帖子

为什么我的演员调度在Akka缩小了?

我有一个100个运行Actors的actor池,它共享一个工作窃取调度程序,其CorePoolSize设置为100.但是现在当向一个Actors发送19条消息时,19条消息没有并行化到19个Actors,只有5条消息正在运行在平行下.当这5条消息完成后,接下来的5条消息将由这些相同的5个Actors再次处理,依此类推.为什么我的19条消息并行运行,我在这里缺少什么?

我的代码基本上看起来像这样:

object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  println("Actor: " + name + " Received: " + num)
                               Thread.sleep(10000)
                            }
    }
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
     val …
Run Code Online (Sandbox Code Playgroud)

scala akka

8
推荐指数
2
解决办法
1103
查看次数

如何处理Scala期货中的异常?

我实现了一个简单的作业处理器来处理期货中的子作业(scala.actors.Futures).这些期货本身可以为加工子工作创造更多的未来.现在,如果其中一个子工具抛出异常,我希望作业处理器回复该作业的错误消息.我有一个解决方法来发现失败的subjobs,但我不确定这是否是最好的解决方案.基本上它的工作原理如下:

sealed trait JobResult
case class SuccessResult(content: String) extends JobResult
case class FailedResult(message: String) extends JobResult

for(subjob <- subjobs) yield {
  future {
    try {
          SuccessResult(process(subjob))
    } catch {
      case e:Exception => FailedResult(e.getMessage)                              
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

顶级的结果是JobResults的递归列表列表... 我递归搜索List以查找失败的结果,然后根据结果类型返回错误或组合结果.这是有效的,但我想知道是否有一个更优雅/更容易的解决方案来处理未来的例外?

scala future

3
推荐指数
1
解决办法
887
查看次数

标签 统计

scala ×2

akka ×1

future ×1