在处理Akka中的下一条消息之前等待异步Future调用

Jam*_*ies 16 scala actor akka

接收事件时,Akka Actors将一次处理一条消息,阻塞直到请求完成,然后再转到下一条消息.

这适用于同步/阻塞任务,但是如果我想执行异步/非阻塞请求,Akka将继续处理而无需等待任务完成.

例如:

 def doThing():Future[Unit] = /* Non blocking request here */

 def receive = {
     case DoThing => doThing() pipeTo sender
 } 
Run Code Online (Sandbox Code Playgroud)

这将调用doThing()并开始处理未来,但在处理下一条消息之前不会等待它完成 - 它将尽可能快地执行队列中的下一条消息.

实质上,似乎Akka认为"返回未来"是"完成处理"并转移到下一条消息.

为了一次处理一条消息,似乎我需要主动阻止Actor线程来阻止它这样做

def receive = {
    case DoThing => sender ! blocking(Await.result(doThing()))
}
Run Code Online (Sandbox Code Playgroud)

这感觉就像一个非常错误的方法 - 它是人为地阻塞代码中的一个线程,否则它应该是完全无阻塞的.

当将Akka与Elixir演员比较时,我们可以通过使用尾调用来请求下一条消息而不需要人为阻塞来轻松避免这个问题.

在Akka还有什么办法吗?

a)等待a Future完成,然后处理下一条消息而不阻塞线程.

b)使用显式尾调用或其他一些机制来使用基于拉的工作流而不是基于推送?

Dav*_*ria 9

与评论中建议的一样,您可以使用Stash(http://doc.akka.io/docs/akka/current/scala/actors.html#Stash)特征在等待Future解决时存储传入的消息.

需要保存当前发件人,以便您不会不正确地关闭发件人actor参考.您可以通过类似下面定义的简单案例类来实现此目的.

class MyActor extends Actor with Stash {

  import context.dispatcher

  // Save the correct sender ref by using something like
  // case class WrappedFuture(senderRef: ActorRef, result: Any)
  def doThing(): Future[WrappedFuture] = ???

  override def receive: Receive = {
    case msg: DoThing =>
      doThing() pipeTo self

      context.become({
        case WrappedFuture(senderRef, result) =>
          senderRef ! result
          unstashAll()
          context.unbecome()
        case newMsg: DoThing =>
          stash()
      }, discardOld = false)
  }
}
Run Code Online (Sandbox Code Playgroud)