限制并发Web服务请求(或某些批处理方法)

Far*_*mor 9 asynchronous web-services scala playframework playframework-2.0

我是一个理解,从Web服务中获取逗号分隔的Id列表.
然后我使用Id List进行新的调用,我的问题是Id List大约是10000个长,每个调用都是一个中等大小的XML文档.
当我同时请求所有10 000个异步时,Web服务端点,或者它可能是Play Framework,并不是很喜欢它,因为我只得到大约500个正确的响应.

一些伪代码突出了意图.

for {
  respA <- WS.url(url1).get
  id <- respA.body.split(",")
  respB <- WS.url(url2 + id).get
} yield ...
Run Code Online (Sandbox Code Playgroud)

我如何将并发请求限制为更可行的?

Jam*_*ard 10

这是一个示例应用程序,它将10,000个请求(通过Play的WS库)批量分组为1,000个 - 所有这些都是异步和非阻塞方式:

package controllers

import play.api.libs.concurrent.Promise
import scala.concurrent.duration._
import play.api.libs.ws.WS
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.mvc.{Action, Controller}
import play.api.libs.ws.Response
import play.api.Logger

object Application extends Controller {

  var numRequests = 0

  def index = Action {
    Async {
      val batches: Iterator[Seq[WS.WSRequestHolder]] = requests.grouped(1000)

      val allBatchesFutureResponses = batches.foldLeft(Future.successful(Seq.empty[Response])) { (allFutureResponses, batch) =>
        allFutureResponses.flatMap { responses =>
          val batchFutures = Future.sequence(batch.map(_.get))
          batchFutures.map { batchResponses =>
            responses ++ batchResponses
          }
        }
      }

      allBatchesFutureResponses.map { responses =>
        Logger.info(responses.size.toString)
        Ok
      }
    }
  }

  def requests = (1 to 10000).map { i =>
    WS.url("http://localhost:9000/pause")
  }

  def pause = Action {
    Async {
      Logger.info(numRequests.toString)
      numRequests = numRequests + 1
      Promise.timeout(Ok, 1 seconds)
    }
  }

}
Run Code Online (Sandbox Code Playgroud)


the*_*eon 7

你需要做一些限制.

阿卡

如何使用一些Akka Actors来提出请求?看看这些用akka限制的方法:

只是与期货

如果你只想使用Futures而不使用Akka Actor,你可以结合使用flatMap(将HTTP请求链接起来一个接一个)并Future.sequence获得你想要的并行度.