Eri*_*ric 8 scala http nonblocking akka
我理解如何在akka中创建基于消息的非阻塞应用程序,并且可以轻松地模拟执行并发操作的示例并将消息中的聚合结果传回.当我的应用程序必须响应HTTP请求时,我很难理解我的非阻塞选项是什么.目标是接收一个请求并立即将其移交给本地或远程演员进行工作,然后将其移交以获得可能需要一些时间的结果.不幸的是,在这个模型下,我不明白我是如何用非阻塞系列的"告诉"来表达这一点,而不是阻止"问".如果在链中的任何一点我使用了一个tell,我将来不再使用它作为最终的响应内容(http框架接口需要,在这种情况下是finagle - 但这并不重要).我理解请求是在它自己的线程上,我的例子非常人为,但只是想了解我的设计选项.
总而言之,如果我下面的设计示例可以重新设计以减少阻止,我非常希望了解如何.这是我第一次使用akka,因为一年前的一些轻度探索,以及我所看到的每篇文章,文档和谈话都表示不会阻止服务.
概念性答案可能会有所帮助,但也可能与我已经阅读过的内容相同.工作/编辑我的例子可能是我理解我试图解决的确切问题的关键.如果当前的例子通常是需要做的事情,确认也是有帮助的,所以我不会搜索不存在的魔法.
注意以下别名:import com.twitter.util.{Future => TwitterFuture,Await => TwitterAwait}
object Server {
val system = ActorSystem("Example-System")
implicit val timeout = Timeout(1 seconds)
implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = {
val promise = TwitterPromise[T]
scFuture onComplete {
case Success(result) ? promise.setValue(result)
case Failure(failure) ? promise.setException(failure)
}
promise
}
val service = new Service[HttpRequest, HttpResponse] {
def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
case "/a/b/c" =>
val w1 = system.actorOf(Props(new Worker1))
val r = w1 ? "take work"
val response: Future[HttpResponse] = r.mapTo[String].map { c =>
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
response
}
}
//val server = Http.serve(":8080", service); TwitterAwait.ready(server)
class Worker1 extends Actor with ActorLogging {
def receive = {
case "take work" =>
val w2 = context.actorOf(Props(new Worker2))
pipe (w2 ? "do work") to sender
}
}
class Worker2 extends Actor with ActorLogging {
def receive = {
case "do work" =>
//Long operation...
sender ! "The Work"
}
}
def main(args: Array[String]) {
val r = service.apply(
com.twitter.finagle.http.Request("/a/b/c")
)
println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work
}
}
Run Code Online (Sandbox Code Playgroud)
提前感谢您提供的任何指导!
您可以通过使用管道模式 -ie 来避免将未来作为消息发送Worker1:
pipe(w2 ? "do work") to sender
Run Code Online (Sandbox Code Playgroud)
代替:
sender ! (w2 ? "do work")
Run Code Online (Sandbox Code Playgroud)
现在r将是一个Future[String]而不是一个Future[Future[String]].
更新:上述pipe解决方案是避免让您的演员回应未来的一般方法.正如Viktor在下面的评论中指出的那样,在这种情况下,你可以Worker1通过告诉Worker2直接回应演员它(Worker1)从以下方面获得消息来完全退出循环:
w2.tell("do work", sender)
Run Code Online (Sandbox Code Playgroud)
如果Worker1负责以Worker2某种方式对响应进行操作(通过使用mapon w2 ? "do work",将多个期货与flatMapa或者for理解结合起来等),这将不是一个选项,但如果没有必要,这个版本更清洁,更有效.
这会杀死一个人Await.result.您可以通过编写如下内容来摆脱对方:
val response: Future[HttpResponse] = r.mapTo[String].map { c =>
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
Run Code Online (Sandbox Code Playgroud)
现在你只需要把它Future变成一个TwitterFuture.我无法告诉你如何做到这一点,但它应该是相当微不足道的,绝对不需要阻止.