Java/Scala Future由回调驱动

Kal*_*dre 6 java scala nonblocking akka playframework-2.0

精简版:

如何创建一个Promise<Result>在回调触发器上完成的?

长版:

我正在开发一个处理第三方SOAP服务的应用程序.来自用户的请求同时委托多个SOAP服务,聚合结果并发送回用户.

系统需要是可扩展的,并且应该允许多个并发用户.当每个用户请求最终触发大约10个Web服务调用并且每个呼叫阻塞大约1秒时,系统需要设计为具有非阻塞I/O.

我在Play Framework(Java)中使用Apache CXF用于此系统.我已设法生成异步WS客户端代理并启用异步传输.我无法弄清楚当我委托给多个Web服务代理时如何返回Future to Play的Thread,结果将作为回调获得.

选项1:使用返回Java Future的异步方法调用.

正如java.util.concurrent.Future线程的scala.concurrent.Future包装器中所描述的那样,我们无法将Java Future转换为Scala Future.从Future获得结果的唯一方法是Future.get()阻止调用者.由于CXF生成的代理返回Java Future,因此排除了此选项.

选项2:使用Scala Future.

由于CXF生成代理接口,我不确定是否有任何方式可以干预并返回Scala Future(AFAIK Akka使用Scala Futures)而不是Java Future?

选项3:使用回调方法.

由CXF生成的返回Java Future的异步方法也需要一个回调对象,我想这将在结果准备好时提供回调.要使用这种方法,我需要返回一个将等待我收到回调的Future.

我认为选项3是最有希望的,虽然我对如何返回一个将在收到回调时完成的Promise没有任何想法.我可能有一个线程在while(true)等待,直到结果可用.再一次,我不知道如何在wait不阻塞线程的情况下进入?

简而言之,我正在尝试构建一个系统,该系统正在进行大量的SOAP Web服务调用,其中每个调用都会占用大量时间.在许多并发Web服务调用的情况下,系统可能很容易耗尽线程.我正在寻找一种基于非阻塞I/O的解决方案,它可以同时允许许多正在进行的Web服务调用.

bjf*_*her 4

选项 3 看起来不错:) 首先导入一些内容...

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
Run Code Online (Sandbox Code Playgroud)

为了说明这一点,下面是一个接受回调的模拟 CXF API:

def fetch(url: String, callback: String => Unit) = {
  callback(s"results for $url")
}
Run Code Online (Sandbox Code Playgroud)

创建一个promise,以promise作为回调调用API:

val promise = Promise[String]
fetch("http://corp/api", result => promise.success(result))
Run Code Online (Sandbox Code Playgroud)

然后您可以将promise.futurewhich 的实例Future放入您的 Play 应用程序中。

要测试它,您可以这样做:

Await.result(promise.future, Duration.Inf)
Run Code Online (Sandbox Code Playgroud)

这将阻止等待结果,此时您应该在控制台中看到“ http://corp/api的结果”。

  • 感谢您提供示例代码,尽管我使用的是 Play Java。我认为我错过的一点是“Promise”正是实现这一目标的工具。对于 Play Java,它是“play.libs.F.RedeemablePromise”。因此,我可以返回一个“RedeemablePromise”实例,并传递“javax.xml.ws.AsyncHandler”的句柄,其中嵌入了 Promise 对象,以便回调对象可以通过“success()”调用委托给 Promise。我想我现在明白了 Future 和 Promise 之间的区别,我过去认为它们非常相似。 (2认同)