Akka:正确使用`ask`模式?

hea*_*ash 13 asynchronous scala future akka

我正试图Futures在akka中摸索并询问模式.

所以,我制作了两个演员,一个要求另一个演员给他发回信息.好吧,根据akka的Futures文档,演员应该询问(?)消息,它应该给他一个Future实例.然后演员应该阻止(使用Await)获得Future结果.

好吧,我永远不会完成我的未来.这是为什么?

代码是:

package head_thrash

import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._

object Main extends App {

  val system = ActorSystem("actors")

  val actor1 = system.actorOf(Props[MyActor], "node_1")
  val actor2 = system.actorOf(Props[MyActor], "node_2")

  actor2 ! "ping_other"

  system.awaitTermination()

  Console.println("Bye!")
}

class MyActor extends Actor with ActorLogging {
  import akka.pattern.ask

  implicit val timeout = Timeout(100.days)

  def receive = {
    case "ping_other" => {
      val selection = context.actorSelection("../node_1")
      log.info("Sending ping to node_1")
      val result = Await.result(selection ? "ping", Duration.Inf) // <-- Blocks here forever!
      log.info("Got result " + result)
    }
    case "ping" => {
      log.info("Sending back pong!")
      sender ! "pong"
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

如果我换Duration.Inf5.seconds,那么演员等待5秒,告诉我的未来是Timeouted(通过投掷TimeoutException),然后其他演员最终回复所需的消息.所以,没有异步发生.为什么?:-(

我该如何正确实施该模式?谢谢.

Arn*_*lay 10

官方的Akka 文档说Await.result将导致当前线程阻塞并等待Actor用它的回复"完成"Future.

奇怪的是,你的代码永远存在于那里,你的所有应用程序只有一个线程吗?

无论如何,我想一种更"惯用"的方式来编写代码就是对未来的成功使用回调.

def receive = {
    case "ping_other" => {
      val selection = context.actorSelection("../node_1")
      log.info("Sending ping to node_1")
      val future: Future[String] = ask(selection, "ping").mapTo[String]
      future.onSuccess { 
         case result : String ? log.info("Got result " + result)
      }
    }
...
Run Code Online (Sandbox Code Playgroud)

  • Akka应用程序不是单线程的.它的工作方式是actor从线程池处理线程上的邮箱消息,然后在处理完消息后释放该线程.这意味着如果您阻止接收,该线程将被阻止,并且该actor将不再处理任何消息.因此,您应该避免阻止. (3认同)
  • 更“惯用”的方式 - 使用 `pipe` 将 Future 的结果发送给 Actor (3认同)

Pat*_*all 5

这不起作用的两个原因.

首先,"node_1"询问自己并且"ping"将不会被处理,因为它在等待请求时阻塞.

此外,相对路径("../node_1")存在actorSelection的缺点.它使用消息传递进行处理,并且由于您的actor正在阻止它,因此无法处理任何其他消息.在即将推出的2.3版Akka中,这已得到改进,但无论如何都应该避免阻塞.