Krz*_*zyH 10 scala akka spray-client
我在Scala中使用Akka actor从外部服务(HTTP get请求)下载资源.外部服务的响应是JSON,我必须使用分页(提供程序非常慢).我想在10个线程中同时下载所有分页结果.我使用这样的URL来下载块:http://service.com/itmes? limit = 50& offset = 1000
我创建了以下管道:
ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator
Run Code Online (Sandbox Code Playgroud)
ScatterActor获取要下载的项目的总数,并将其划分为块.我创建了10个LoadChunkActor来同时处理任务.
override def receive: Receive = {
case LoadMessage(limit) =>
val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
offsets.foreach(offset => context.system.actorSelection(pipe) !
LoadMessage(chunkSize, offset))
}
Run Code Online (Sandbox Code Playgroud)
LoadChunkActor使用Spray发送请求.演员看起来像这样:
val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
case LoadMessage(limit, offset) =>
val uri: String = s"http://service.com/items?limit=50&offset=$offset"
val responseFuture = pipeline {Get(uri)}
responseFuture onComplete {
case Success(items) => aggregator ! Loaded(items)
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,LoadChunkActor正在从外部服务请求块并添加要在onComplete上运行的回调.演员现在已准备好接收另一条消息,他正在请求另一个大块.Spray使用非阻塞API来下载块.结果外部服务充斥着我的请求,我得到了超时.
如何安排任务列表但我想同时处理最多10个任务?
我创建了以下解决方案(类似于拉http://www.michaelpollmeier.com/akka-work-pulling-pattern/:
ScatterActor (10000x messages) =>
ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
^ |
|<--------WorkDoneMessage------------|
Run Code Online (Sandbox Code Playgroud)