并行计算/解析列表元素(链接)

3 parallel-processing concurrency parsing scala concurrent-programming

无法理解并行计算列表元素的正确方法是什么,但是在不计算元素(并行)时阻塞主线程.使用案例:我有一个URL链接列表和一个简单的html页面解析器,我可以通过并行解析每个页面来减少从给定页面获取信息所需的时间,然后返回一个带有一些JSON数据的简单列表.

据我所知,我有两个选择:

与期货并行的方式

我有一个方法在Future中提取一些JSON数据:

def extractData(link: String): Future[JValue] = // some implementation
Run Code Online (Sandbox Code Playgroud)

我只是将它映射到一个链接列表,其类型将是List [Future [JValue]]:

val res: List[Future[JValue]] = listOfLink.map(extractData)
Run Code Online (Sandbox Code Playgroud)

如果我调用sequence(例如来自Scalaz或我自己的实现)遍历此列表并将其转换为Future[List[JValue]],则链接仍将按顺序处理,但是一个单独的线程,这将不会给我任何效率,导致我需要的结果得到一个List[JValue].

尝试使用ParSeq进行计算

在这个选项中,我有一个只提取数据的函数:

def extractData(link: String): JValue = // some implementation
Run Code Online (Sandbox Code Playgroud)

但是这次打电话.par给集合:

val res: ParSeq[JValue] = listOfLinks.map(extractData)
Run Code Online (Sandbox Code Playgroud)

但是这样我不太明白如何在不按顺序解析每个链接的情况下阻止主列表,而不会计算空洞列表

至于Akka,我只是不能在这里使用演员,所以只有FuturePar*

Tra*_*own 5

映射集合时,并行处理链接extractData.考虑一个略微简化的例子:

import scala.concurrent._
import ExecutionContext.Implicits.global

def extractData(s: String) = future {
  printf("Starting: %s\n", s)
  val i = s.toInt
  printf("Done: %s\n", s)
  i
}

val xs = (0 to 5).map(_.toString).toList

val parsed = Future.sequence(xs map extractData)
Run Code Online (Sandbox Code Playgroud)

现在您将看到如下所示的内容,这清楚地表明这些内容没有按顺序处理:

Starting: 0
Done: 0
Starting: 2
Done: 2
Starting: 1
Starting: 4
Done: 1
Starting: 3
Starting: 5
Done: 5
Done: 4
Done: 3
Run Code Online (Sandbox Code Playgroud)

请注意,您可以使用Future.traverse以避免创建期货的中间列表:

val parsed = Future.traverse(xs)(extractData)
Run Code Online (Sandbox Code Playgroud)

无论哪种情况,您都可以阻止Await:

val res = Await.result(parsed, duration.Duration.Inf)
Run Code Online (Sandbox Code Playgroud)

作为脚注:我不知道您是否计划使用Dispatch来执行HTTP请求,但如果没有,则值得一看.它还提供了很好的集成JSON解析,文档中充满了如何使用future的有用示例.