在Akka演员中积累状态的正确模式

T. *_*one 10 scala akka

题:

在Akka演员中积累状态的正确模式是什么?

语境:

假设我有一些服务都返回数据.

class ServiceA extends Actor {
  def receive = {
    case _ => sender ! AResponse(100)
  }
}

class ServiceB extends Actor {
   def receive = {
     case _ => sender ! BResponse("n")
   }
}

// ...
Run Code Online (Sandbox Code Playgroud)

我希望有一个控制/监督演员协调所有这些服务的谈话并跟踪他们的响应,然后将所有数据的响应发送回原始发件人.

class Supervisor extends Actor {
  def receive = {
    case "begin" => begin
    case AResponse(id) => ???
    case BResponse(letter) => ???
  }

 // end goal:
 def gotEverything(id: Int, letter: String) =
   originalSender ! (id, letter)

  def begin = {
    ServiceA ! "a"
    ServiceB ! "b"
  }
}
Run Code Online (Sandbox Code Playgroud)

当服务响应进来时,如何将所有状态保持在一起?据我了解,如果我要将AResponse的值分配给,比如说,var aResponse: Intvar会随着收到不同的消息而不断变化,而且var在我等待BResponse消息的时候,我不可能依靠它.

我意识到我可以使用ask并且只是嵌套/ flatMap Future的,但是从我读到的那是一个糟糕的模式.没有Future的话,有没有办法实现这一切?

Vla*_*eev 16

因为从不同时从多个线程访问actor,所以您可以轻松地存储和改变所需的任何状态.例如,您可以这样做:

class Supervisor extends Actor {
  private var originalSender: Option[ActorRef] = None
  private var id: Option[WhateverId] = None
  private var letter: Option[WhateverLetter] = None

  def everythingReceived = id.isDefined && letter.isDefined

  def receive = {
    case "begin" =>
      this.originalSender = Some(sender)
      begin()

    case AResponse(id) =>
      this.id = Some(id)
      if (everythingReceived) gotEverything()

    case BResponse(letter) =>
      this.letter = Some(letter)
      if (everythingReceived) gotEverything()
  }

  // end goal:
  def gotEverything(): Unit = {
    originalSender.foreach(_ ! (id.get, letter.get))
    originalSender = None
    id = None
    letter = None
  }

  def begin(): Unit = {
    ServiceA ! "a"
    ServiceB ! "b"
  }
}
Run Code Online (Sandbox Code Playgroud)

然而,还有一种更好的方法.您可以使用没有显式状态变量的actor模拟某种状态机.这是使用become()机制完成的.

class Supervisor extends Actor {
  def receive = empty

  def empty: Receive = {
    case "begin" =>
      AService ! "a"
      BService ! "b"
      context become noResponses(sender)
  }

  def noResponses(originalSender: ActorRef): Receive = {
    case AResponse(id) => context become receivedId(originalSender, id)
    case BResponse(letter) => context become receivedLetter(originalSender, letter)
  }

  def receivedId(originalSender: ActorRef, id: WhateverId): Receive = {
    case AResponse(id) => context become receivedId(originalSender, id)
    case BResponse(letter) => gotEverything(originalSender, id, letter)
  }

  def receivedLetter(originalSender: ActorRef, letter: WhateverLetter): Receive = {
    case AResponse(id) => gotEverything(originalSender, id, letter)
    case BResponse(letter) => context become receivedLetter(originalSender, letter)
  }

  // end goal:
  def gotEverything(originalSender: ActorRef, id: Int, letter: String): Unit = {
    originalSender ! (id, letter)
    context become empty
  }
}
Run Code Online (Sandbox Code Playgroud)

这可能稍微冗长一点,但它不包含显式变量; 所有状态都隐式包含在Receive方法的参数中,当需要更新此状态时,只需切换actor的接收函数以反映此新状态.

请注意,上面的代码非常简单,当有许多"原始发件人"时,它将无法正常工作.在这种情况下,您必须为所有邮件添加一个ID,并使用它们来确定哪些响应属于哪个"原始发件人"状态,或者您可以为每个"原始发件人"创建多个参与者.