我在Spray应用程序中使用ask模式调用Actor,并将结果作为HTTP响应返回.我将演员的失败映射到自定义错误代码.
val authActor = context.actorOf(Props[AuthenticationActor])
callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
complete(StatusCodes.OK, user)
}
def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
onComplete(f) {
case Success(value: T) => cb(value)
case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
//In reality this returns a custom error object.
}
}
Run Code Online (Sandbox Code Playgroud)
当authActor发送失败时,这可以正常工作,但如果authActor抛出异常,则在询问超时完成之前不会发生任何事情.例如:
override def receive: Receive = {
case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}
Run Code Online (Sandbox Code Playgroud)
我知道Akka的文档说的是
要通过例外完成未来,您需要向发件人发送失败消息.当actor在处理消息时抛出异常时,不会自动执行此操作.
但鉴于我在Spray路由演员和服务演员之间使用了很多接口,我宁愿不用try/catch包装每个子actor的接收部分.是否有更好的方法来实现子actor中异常的自动处理,并在发生异常时立即解决未来问题?
编辑:这是我目前的解决方案.然而,为每个儿童演员做这件事都很麻烦.
override def receive: Receive = {
case default =>
try {
default match {
case _ => throw new ServiceException("")//Actual code would go here
}
}
catch {
case se: ServiceException =>
logger.error("Service error raised:", se)
sender ! Failure(se)
case ex: Exception =>
sender ! Failure(ex)
throw ex
}
}
Run Code Online (Sandbox Code Playgroud)
这样,如果它是预期的错误(即ServiceException),则通过创建失败来处理.如果它是意外的,它会立即返回失败,以便解决未来,但随后抛出异常,以便SupervisorStrategy仍然可以处理它.
cmb*_*ter 16
如果您想要一种方法在发生意外异常时自动将响应发送回发件人,那么这样的事情可能对您有用:
trait FailurePropatingActor extends Actor{
override def preRestart(reason:Throwable, message:Option[Any]){
super.preRestart(reason, message)
sender() ! Status.Failure(reason)
}
}
Run Code Online (Sandbox Code Playgroud)
我们覆盖preRestart并将故障传播回发送方,Status.Failure这将导致上游Future失败.此外,重要的是要super.preRestart在这里打电话,因为这是儿童停止发生的地方.在演员中使用它看起来像这样:
case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {
def receive = {
case GetElement(list, i) =>
val result = list(i)
sender() ! result
}
}
Run Code Online (Sandbox Code Playgroud)
如果我是这样调用这个actor的实例:
import akka.pattern.ask
import concurrent.duration._
val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)
fut onComplete{
case util.Success(result) =>
println(s"success: $result")
case util.Failure(ex) =>
println(s"FAIL: ${ex.getMessage}")
ex.printStackTrace()
}
Run Code Online (Sandbox Code Playgroud)
然后它会正确击中我的Failure块.现在,当Futures没有参与扩展该特征的actor 时,该基本特征中的代码运行良好,就像这里的简单actor一样.但是如果你使用Futures然后你需要小心,因为发生的异常Future不会导致在actor中重新启动,而且,在preRestart调用sender()中将不会返回正确的引用,因为actor已经移动到下一条消息中.像这样的演员表明了这个问题:
class MyBadFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
def receive = {
case GetElement(list, i) =>
val orig = sender()
val fut = Future{
val result = list(i)
orig ! result
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我们在之前的测试代码中使用此actor,我们将始终在失败情况下获得超时.为了缓解这种情况,您需要将期货结果反馈给发件人,如下所示:
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe
def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut pipeTo sender()
}
}
Run Code Online (Sandbox Code Playgroud)
在这种特殊情况下,actor本身不会重新启动,因为它没有遇到未捕获的异常.现在,如果你的演员需要在未来之后进行一些额外的处理,那么当你得到一个时,你可以回到自己并显式地失败Status.Failure:
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe
def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut.to(self, sender())
case d:Double =>
sender() ! d * 2
case Status.Failure(ex) =>
throw ex
}
}
Run Code Online (Sandbox Code Playgroud)
如果这种行为变得普遍,你可以将它提供给任何需要它的演员:
trait StatusFailureHandling{ me:Actor =>
def failureHandling:Receive = {
case Status.Failure(ex) =>
throw ex
}
}
class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
import context.dispatcher
import akka.pattern.pipe
def receive = myReceive orElse failureHandling
def myReceive:Receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut.to(self, sender())
case d:Double =>
sender() ! d * 2
}
}
Run Code Online (Sandbox Code Playgroud)