hea*_*ash 13 asynchronous scala future akka
我正试图Futures在akka中摸索并询问模式.
所以,我制作了两个演员,一个要求另一个演员给他发回信息.好吧,根据akka的Futures文档,演员应该询问(?)消息,它应该给他一个Future实例.然后演员应该阻止(使用Await)获得Future结果.
好吧,我永远不会完成我的未来.这是为什么?
代码是:
package head_thrash
import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
object Main extends App {
val system = ActorSystem("actors")
val actor1 = system.actorOf(Props[MyActor], "node_1")
val actor2 = system.actorOf(Props[MyActor], "node_2")
actor2 ! "ping_other"
system.awaitTermination()
Console.println("Bye!")
}
class MyActor extends Actor with ActorLogging {
import akka.pattern.ask
implicit val timeout = Timeout(100.days)
def receive = {
case "ping_other" => {
val selection = context.actorSelection("../node_1")
log.info("Sending ping to node_1")
val result = Await.result(selection ? "ping", Duration.Inf) // <-- Blocks here forever!
log.info("Got result " + result)
}
case "ping" => {
log.info("Sending back pong!")
sender ! "pong"
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我换Duration.Inf到5.seconds,那么演员等待5秒,告诉我的未来是Timeouted(通过投掷TimeoutException),然后其他演员最终回复所需的消息.所以,没有异步发生.为什么?:-(
我该如何正确实施该模式?谢谢.
Arn*_*lay 10
官方的Akka 文档说Await.result将导致当前线程阻塞并等待Actor用它的回复"完成"Future.
奇怪的是,你的代码永远存在于那里,你的所有应用程序只有一个线程吗?
无论如何,我想一种更"惯用"的方式来编写代码就是对未来的成功使用回调.
def receive = {
case "ping_other" => {
val selection = context.actorSelection("../node_1")
log.info("Sending ping to node_1")
val future: Future[String] = ask(selection, "ping").mapTo[String]
future.onSuccess {
case result : String ? log.info("Got result " + result)
}
}
...
Run Code Online (Sandbox Code Playgroud)
这不起作用的两个原因.
首先,"node_1"询问自己并且"ping"将不会被处理,因为它在等待请求时阻塞.
此外,相对路径("../node_1")存在actorSelection的缺点.它使用消息传递进行处理,并且由于您的actor正在阻止它,因此无法处理任何其他消息.在即将推出的2.3版Akka中,这已得到改进,但无论如何都应该避免阻塞.
| 归档时间: |
|
| 查看次数: |
18153 次 |
| 最近记录: |