并行化Scala的迭代器

Rok*_*alj 15 parallel-processing iterator scala scala-collections

请注意:这不是一个重复的问题,因为这个问题规定了所有方法Iterator,而不仅仅是mapflatMap.因此Future.traverse不是一个好的答案.

假设我有这个简单的陈述:

(1 to 100).toSet.subsets.find(f)
Run Code Online (Sandbox Code Playgroud)

它完美地运作.它是懒惰的,不会使用大量内存,只要找到一个元素就会返回.当您想并行化时,问题就开始了.你也许会说,这是斯卡拉,必须有.parIterator,但没有.

互联网上提出的解决方案是使用.grouped,但它不如我想要的那么好.为什么?

val it = (1 to 100).toSet.subsets.grouped(1000000).map(_.par.find(f)).flatten
if (it.hasNext) Some(it.next) else None
Run Code Online (Sandbox Code Playgroud)
  1. 使用更多的内存.我知道它仍然是O(1),但让我们在这里完美:)

  2. 它不是完全可并行化的(根据Amdahl定律).当.grouped消耗下一个百万元素块的迭代器时,除了一个线程之外的所有元素都在等待.如果迭代器消耗昂贵,则这尤其成问题.此外,还需要产生一组新线程来处理新块的开销.

  3. 生成更复杂/更长的代码(参见示例).如果Iterator.nextOption,它会缩短代码,但仍然.

尽管编程我自己的生产者 - 消费者模型(迭代器是生产者,线程是消费者)然后最终减少步骤,还有什么吗?

edo*_*fic 2

您可以使用.toStream。这将产生一个惰性流来记忆值。它有.par它。

它会在堆上分配一些包装器,但如果你小心(不保留指向流的指针),这只会导致 GC 压力,但不会增加剩余内存占用。它仍然会走得很快。请注意,并行集合会产生相当多的开销,如果每个元素的计算不够昂贵,则可能不值得。

迭代器的级别太低,无法并行化。但您实际上并不需要并行迭代器,而是需要并行遍历迭代器,您可以从标准库中使用Future.traverse 。

  • 此解决方案的问题是:Stream 上的“.par”返回普通的“ParSeq”,这意味着整个流在并行计算之前已被消耗。 (6认同)