在能够处理其他一些消息之前初始化一个actor

アレッ*_*ックス 7 asynchronous scala future akka

我有一个演员创造另一个:

class MyActor1 extends Actor {
  val a2 = system actorOf Props(new MyActor(123))
}
Run Code Online (Sandbox Code Playgroud)

第二个actor必须在创建后自行初始化(bootstrap),并且只有在此之后它必须能够完成其他工作.

class MyActor2(a: Int) extends Actor {
  //initialized (bootstrapped) itself, potentially a long operation 
  //how?
  val initValue = // get from a server

  //handle incoming messages
  def receive = {
    case "job1" => // do some job but after it's initialized (bootstrapped) itself
  }
}
Run Code Online (Sandbox Code Playgroud)

因此,首先MyActor2必须做的是做一些初始化自己的工作.这可能需要一些时间,因为它是对服务器的请求.只有在成功完成后,它必须能够处理传入的消息receive.在那之前 - 它不能那样做.

当然,对服务器的请求必须是异步的(最好是使用Future,不是async,await或其他高级别的东西AsyncHttpClient).我知道如何使用Future,但这不是问题.

我该如何确保?

ps我的猜测是它必须首先向自己发送消息.

sen*_*nia 10

您可以使用become方法在初始化后更改actor的行为:

class MyActor2(a: Int) extends Actor {

  server ! GetInitializationData

  def initialize(d: InitializationData) = ???

  //handle incoming messages
  val initialized: Receive = {
    case "job1" => // do some job but after it's initialized (bootstrapped) itself
  }

  def receive = {
    case d @ InitializationData =>
      initialize(d)
      context become initialized
  }
}
Run Code Online (Sandbox Code Playgroud)

请注意,此类actor将在初始化之前删除所有消息.您必须手动保留这些消息,例如使用Stash:

class MyActor2(a: Int) extends Actor with Stash {

  ...

  def receive = {
    case d @ InitializationData =>
      initialize(d)
      unstashAll()
      context become initialized
    case _ => stash()
  }
}
Run Code Online (Sandbox Code Playgroud)

如果您不想var用于初始化,可以使用InitializationData如下方法创建初始化行为:

class MyActor2(a: Int) extends Actor {

  server ! GetInitializationData

  //handle incoming messages
  def initialized(intValue: Int, strValue: String): Receive = {
    case "job1" => // use `intValue` and `strValue` here
  }

  def receive = {
    case InitializationData(intValue, strValue) =>
      context become initialized(intValue, strValue)
  }
}
Run Code Online (Sandbox Code Playgroud)


mav*_*ein 5

我不知道提议的解决方案是个好主意.发送初始化消息对我来说似乎很尴尬.演员有一个生命周期并提供一些钩子.当你看一下API时,你会发现它prestart.

因此,我提出以下建议:

  • 创建actor时,会运行其preStart挂钩,您可以在其中执行返回future的服务器请求.
  • 虽然未来未完成,但所有传入的消息都被隐藏起来.
  • 当未来完成时,它使用context.become来使用您的真实/普通接收方法.
  • 在成为你解开一切之后.

这是代码的粗略草图(不好的解决方案,请参阅下面的实际解决方案):

class MyActor2(a: Int) extends Actor with Stash{

  def preStart = {
    val future = // do your necessary server request (should return a future)
    future onSuccess {
      context.become(normalReceive)
      unstash()
    }
  }

  def receive = initialReceive

  def initialReceive = {
    case _ => stash()
  }

  def normalReceive = {
    // your normal Receive Logic
  }
}
Run Code Online (Sandbox Code Playgroud)

更新:根据Senias的反馈改进了解决方案

class MyActor2(a: Int) extends Actor with Stash{

  def preStart = {
    val future = // do your necessary server request (should return a future)
    future onSuccess {
      self ! InitializationDone
    }
  }

  def receive = initialReceive

  def initialReceive = {
    case InitializationDone =>
      context.become(normalReceive)
      unstash()
    case _ => stash()
  }

  def normalReceive = {
    // your normal Receive Logic
  }

  case class InitializationDone
}
Run Code Online (Sandbox Code Playgroud)

  • 对不起,但这是一个糟糕的解决方案.`onSuccess`将在某个与你的actor没有任何关系的线程中执行.你会在`stash()`和`unstash()`上遇到竞争条件.永远不要分享演员的状态!它打破了线程安全.改变actor状态的唯一有效方法是消息处理. (2认同)