Akka,期货和关键部分

Alv*_*olo 4 concurrency scala future akka

假设我们有一个Akka演员,它根据a维持一个内部状态var.

class FooActor extends Actor {
  private var state: Int = 0

  def receive = { ... }
}
Run Code Online (Sandbox Code Playgroud)

假设接收处理程序调用一个返回未来的操作,我们使用调度程序将其映射为上下文执行程序,最后我们设置一个onSuccess改变actor状态的回调.

import context.dispatcher
def receive = {
  case "Hello" => requestSomething() // asume Future[String]
    .map(_.size)
    .onSuccess { case i => state = i }
}
Run Code Online (Sandbox Code Playgroud)

onSuccess回调中改变actor的状态是否是线程安全的,即使使用actor调度程序作为执行上下文?

Jea*_*ean 10

不,不是(akka 2.3.4 文档).

在这种情况下,您需要做的是向自己发送消息以改变状态.如果您需要订购,您可以使用藏匿并成为.像这样的东西

import akka.actor.{Stash,Actor}
import akka.pattern.pipe
case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
  private var state: Int = 0
  def receive = ready
  def ready  = {
    case "Hello" => requestSomething() // asume Future[String]
      .map(StateUpdate(_.size)) pipeTo self
      become(busy)
  } 
  def busy {
     case StateUpdate(i) => 
       state=i
       unstashAll()
       become(ready)
     case State.Failure(t:Throwable) => // the future failed
     case evt =>
       stash()   
  }
}
Run Code Online (Sandbox Code Playgroud)

当然这是一个简单的实现,你可能想要处理超时和东西,以避免你的演员卡住.

如果您不需要在您的州订购保证:

case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
  private var state: Int = 0
  def receive = {
    case "Hello" => requestSomething() // asume Future[String]
      .map(StateUpdate(_.size)) pipeTo self
    case StateUpdate(i) => state=i
  } 
Run Code Online (Sandbox Code Playgroud)

但是演员状态可能不是最后一个字符串的长度