使用Scala + Akka进行奇怪的尝试/捕获行为

Rus*_*eks 2 scala exception akka

我正在尝试使用Akka为自定义应用程序协议实现TCP服务器.我试图按照这里给出的例子:http://doc.akka.io/docs/akka/2.0/scala/io.html在for ... yield循环中进行非阻塞IO.

我发现当我从yield块中抛出异常时,我无法从块外部捕获它.我想我对Akka或Scala如何在这里工作有一个根本的误解,我会感激任何提示.

我把代码归结为:

import akka.actor._
import java.net.InetSocketAddress

class EchoServer(port: Int) extends Actor {

  val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {
    case IO.NewClient(server) =>
      val socket = server.accept()
      state(socket) flatMap (_ => EchoServer.processRequest(socket))
    case IO.Read(socket, bytes) =>
      state(socket)(IO.Chunk(bytes))
    case IO.Closed(socket, cause) =>
      state(socket)(IO.EOF(None))
      state -= socket
  }
}

object EchoServer extends App
{
  def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
  {
    println( "In process request")
    try {
      for {
        bs <- IO take 1
      } yield {
        println("I'll get here")
        throw new Exception("Hey-o!")
        println("But not here ... as expected")
      }
    } catch {
      case e: Exception => println("And not here ... wtf?"); IO.Done()  // NEVER GETS HERE
    }
  }

  ActorSystem().actorOf(Props(new EchoServer(8080)))
}
Run Code Online (Sandbox Code Playgroud)

也许更方便地遵循这里的要点:https://gist.github.com/2296554

任何人都可以解释为什么在这种情况下控制没有达到我的阻挡块?

我注意到如果我在Akka中打开调试日志记录,我会在输出中看到此消息:

[DEBUG] [04/03/2012 22:42:25.106] [EchoServerActorSystem-akka.actor.default-dispatcher-1] [Future] Hey-o!
Run Code Online (Sandbox Code Playgroud)

所以我猜这个例外是由Akka调度员处理的?任何人都能解释一下这是怎么回事吗?

Deb*_*ski 6

非阻塞IO的重点当然是无法保证执行的时间和地点.请记住,可以将for comprehension写为

(IO take 1).map(bs => {
  println("I'll get here"); throw // ...
}
Run Code Online (Sandbox Code Playgroud)

这段代码有什么作用?IO take 1返回一些非阻塞Future类的东西,然后通过该map方法附加一个变换函数.即每当(和任何地方)IO take 1准备就绪时,它将应用于map结果.

所有这些都发生在其他一些线程中(或者使用其他一些实现非阻塞语义的方式),因此没有办法让try- catch对任何Exception被抛出的s 做出反应.该bs => println(…) …方法也不会知道您的异常处理.所有它都知道它应该转换一些输入bs并在结束时产生结果.

需要吸取的教训:使用非阻塞代码时避免副作用.尤其如此,如果使用副作用来改变执行流程.

为了实际捕获异常,我认为你必须按如下方式构造它(未经测试;参见API):

def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
  {
    println( "In process request")
    (
      for {
        bs <- IO take 1
      } yield {
        println("I'll get here")
        throw new Exception("Hey-o!")
        println("But not here ... as expected")
      }
    ) recover {
      case e: Exception => println("And here ...?"); IO.Done()
    }
  }
Run Code Online (Sandbox Code Playgroud)