我使用akka已经有一段时间了.我开始在我的代码中看到一些模式来解决异步io的延迟回复.这个实现好吗?还有另一种方法可以做一个没有阻止的延迟回复吗?
class ApplicationApi(asyncIo : ActorRef) extends Actor {
// store senders to late reply
val waiting = Map[request, ActorRef]()
def receive = {
// an actore request for a user, store it to late reply and ask for asyncIo actor to do the real job
case request : GetUser =>
waiting += (sender -> request)
asyncIo ! AsyncGet("http://app/user/" + request.userId)
// asyncio response, parse and reply
case response : AsyncResponse =>
val user = parseUser(response.body)
waiting.remove(response.request) match {
case Some(actor) => actor ! GetUserResponse(user)
}
}
}
Run Code Online (Sandbox Code Playgroud)
在等待回复时避免阻塞的一种方法是使用askmethod-aka ?运算符发送- 它返回一个Future(不同于!返回()).
使用onSuccess或foreach方法,您可以指定在将来/何时通过回复完成时要执行的操作.要使用它,你需要混合AskSupport特性:
class ApplicationApi(asyncIo : ActorRef) extends Actor with AskSupport {
def receive = {
case request: GetUser =>
val replyTo = sender
asyncIo ? AsyncGet("http://app/user/" + request.userId) onSuccess {
case response: AsyncResponse =>
val user = parseUser(response.body)
replyTo ! GetUserResponse(user)
}
}
Run Code Online (Sandbox Code Playgroud)
避免使用此技术执行任何修改ApplicationApiactor 状态的副作用,因为效果将与接收循环不同步.但是,将消息转发给其他参与者应该是安全的.
顺便说一下,这是捕获当前sender模式匹配的一部分的技巧,避免以后需要将其分配给变量.
trait FromSupport { this: Actor =>
case object from {
def unapply(msg: Any) = Some(msg, sender)
}
}
class MyActor extends Actor with FromSupport {
def receive = {
case (request: GetUser) from sender =>
// sender is now a variable (shadowing the method) that is safe to use in a closure
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
781 次 |
| 最近记录: |