在找到所需数量的结果时,在早期中止时过滤Scala的并行集合

Ale*_*ets 10 parallel-processing scala parallel-collections

给定了一个非常大的collection.parallel.mutable.ParHashMap(或任何其他并行集合)实例,如果找到一个给定的,比如50个匹配的数量,那么如何中止过滤并行扫描?

尝试在线程安全的"外部"数据结构中累积中间匹配或保持外部AtomicInteger的结果计数在4个内核上比使用常规collection.mutable.HashMap慢2到3倍,并将单个内核挂在100 %.

我知道Par*集合中的查找存在确实在"内部"中止.有没有办法概括这个以找到多个结果?

下面是这似乎仍然是2至3倍慢上〜79,000项的ParHashMap的代码,也有馅的问题的maxResults(成果转化的结果CHM这可能是由于线程后被抢占incrementAndGet但在此之前突破这允许其他线程添加更多元素).更新:似乎减速是由于工作者线程在counter.incrementAndGet()上竞争,这当然违背了整个并行扫描的目的:-(

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}
Run Code Online (Sandbox Code Playgroud)

Jai*_*rge 0

您可以尝试获取迭代器,然后创建一个惰性列表(流),在其中进行过滤(使用谓词)并获取所需的元素数量。因为它是非严格的,所以不会评估元素的这种“获取”。之后,您可以通过向整个事物添加“.par”来强制执行并实现并行化。

示例代码:

具有随机值的并行化映射(模拟并行哈希映射):

scala> myMap
res14: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(66978401 -> -1331298976, 256964068 -> 126442706, 1698061835 -> 1622679396, -1556333580 -> -1737927220, 791194343 -> -591951714, -1907806173 -> 365922424, 1970481797 -> 162004380, -475841243 -> -445098544, -33856724 -> -1418863050, 1851826878 -> 64176692, 1797820893 -> 405915272, -1838192182 -> 1152824098, 1028423518 -> -2124589278, -670924872 -> 1056679706, 1530917115 -> 1265988738, -808655189 -> -1742792788, 873935965 -> 733748120, -1026980400 -> -163182914, 576661388 -> 900607992, -1950678599 -> -731236098)
Run Code Online (Sandbox Code Playgroud)

获取一个迭代器并从迭代器创建一个 Stream 并对其进行过滤。在这种情况下,我的谓词仅接受(映射的值成员的)对。我想要获得 10 个偶数元素,因此我采用 10 个元素,只有当我强制执行以下操作时才会对这些元素进行求值:

scala> val mapIterator = myMap.toIterator
mapIterator: Iterator[(Int, Int)] = HashTrieIterator(20)


scala> val r = Stream.continually(mapIterator.next()).filter(_._2 % 2 == 0).take(10)
r: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), ?)
Run Code Online (Sandbox Code Playgroud)

最后,我强制评估,按计划只得到 10 个元素

scala> r.force
res16: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), (256964068,126442706), (1698061835,1622679396), (-1556333580,-1737927220), (791194343,-591951714), (-1907806173,365922424), (1970481797,162004380), (-475841243,-445098544), (-33856724,-1418863050), (1851826878,64176692))
Run Code Online (Sandbox Code Playgroud)

通过这种方式,您只需获得所需数量的元素(无需处理剩余元素),并且无需锁、原子操作或中断即可并行处理该过程。

请将其与您的解决方案进行比较,看看它是否有任何好处。