使用Akka和Scalatra

par*_*rsa 4 concurrency scala akka scalatra

我的目标是为我的小部件构建一个高度并发的后端.我现在将后端作为Web服务公开,它接收运行特定小部件的请求(使用Scalatra),从DB获取小部件的代码并在演员(使用Akka)中运行它,然后回复结果.所以想象我做的事情如下:

get("/run:id") {
  ...
  val actor = Actor.actorOf("...").start
  val result = actor !! (("Run",id), 10000)
  ...
} 
Run Code Online (Sandbox Code Playgroud)

现在我相信这不是最好的并发解决方案,我应该在一个actor实现中以某种方式结合侦听请求和运行小部件.您如何设计这个以获得最大的并发性?谢谢.

Cas*_*Jim 5

您可以在akka引导文件或您自己的ServletContextListener中启动您的actor,以便它们在不依赖于servlet的情况下启动.然后你可以用akka注册表查找它们.

Actor.registry.actorFor[MyActor] foreach { _ !! (("Run",id), 10000) }
Run Code Online (Sandbox Code Playgroud)

除此之外,目前还没有与scalatra真正整合akka.因此,到目前为止,您可以做的最好的事情是对一群演员使用阻止请求.

我不确定,但我不需要为每个请求生成一个actor,而是拥有一个可以发送这些请求的widget actor池.如果您使用管理程序层次结构,则可以使用管理程序调整池的大小(如果池太大或太小).

class MyContextListener extends ServletContextListener {

  def contextInitialized(sce: ServletContextEvent) {
    val factory = SupervisorFactory(
      SupervisorConfig(
      OneForOneStrategy(List(classOf[Exception]), 3, 1000),
      Supervise(actorOf[WidgetPoolSupervisor], Permanent)
  }

  def contextDestroyed(sce: ServletContextEvent) {
    Actor.registry.shutdownAll()
  }
}

class WidgetPoolSupervisor extends Actor {

  self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000)

  override def preStart() {
    (1 to 5) foreach { _ =>
       self.spawnLink[MyWidgetProcessor]
    }
    Scheduler.schedule(self, 'checkPoolSize, 5, 5, TimeUnit.MINUTES)
  }

  protected def receive = {
    case 'checkPoolSize => {
      //implement logic that checks how quick the actors respond and if 
      //it takes to long add some actors to the pool.
      //as a bonus you can keep downsizing the actor pool until it reaches 1
      //or until the message starts returning too late.
    }
  }
}

class ScalatraApp extends ScalatraServlet {

  get("/run/:id") {
    // the !! construct should not appear anywhere else in your code except
    // in the scalatra action. You don't want to block anywhere else, but in a 
    // scalatra action it's ok as the web request itself is synchronous too and needs to 
    // to wait for the full response to have come back anyway.
    Actor.registry.actorFor[MyWidgetProcessor] foreach { 
      _ !! ((Run, id), 10000) 
    } getOrElse {
      throw new HeyIExpectedAResultException()
    } 
  }
}
Run Code Online (Sandbox Code Playgroud)

请将上面的代码视为恰好看起来像scala的伪代码,我只是想说明这个概念.