在Scala中,为什么我的Sieve算法运行得如此之慢?

del*_*ber 8 performance functional-programming scala

我正在尝试使用列表和过滤器而不是数组和循环来实现Eratosthenes的Sieve.我不确定为什么以下表现比命令式等效表现差得多.100万应该绝对飞,但我的机器停止了.

  val max = 1000000
  def filterPrimes(upper: Int, seed: Int = 2, sieve: List[Int] = List()): List[Int] =
    sieve.map(x => if (x % seed == 0 && x > seed) 0 else x).filter(_ > 0)

  var filtered: List[Int] = (2 to max).toList
  for (i <- 2 to max / 2) filtered = filterPrimes(max, i, filtered)
  filtered.foreach(println(_))
Run Code Online (Sandbox Code Playgroud)

Ken*_*nde 10

如果你想看到筛子的功能性方法,请查看The Eratosthenes的真正筛子.

  • +1这是一个伟大的阅读,揭穿了经常被抛出的"高尔夫球筛". (2认同)

mer*_*ict 10

有一些潜在的问题,虽然我没有真正看到一个"吸烟枪"......无论如何,这就是我所拥有的.第一:

sieve.map(x => if (x % seed == 0 && x > seed) 0 else x).filter(_ > 0)
Run Code Online (Sandbox Code Playgroud)

可以更简洁地写成:

sieve.filter(x => x <= seed || x % seed != 0)
Run Code Online (Sandbox Code Playgroud)

接下来,upper未使用filterPrimes(这应该对性能没有影响).

第三,如果你想真正使用纯函数风格,不要使用a varfor循环,而是转变filterPrimes为尾递归函数.如果你这样做,编译器可能足够巧妙地优化掉副本(尽管我不会屏住呼吸).

最后,也许最重要的是,您的for循环浪费了大量时间来过滤掉必须已经过滤的值.例如,它在已经过滤了所有2的倍数后尝试过滤4的倍数.如果要有效地使用此筛选算法,则需要从列表中的其余元素中选择种子.

换句话说,将索引保留在列表中,并从索引中确定种子,如:

iteration 0: 2 3 4 5 6 7 8 9 ...
      index: ^

iteration 1: 2 3 5 7 9 ...
      index:   ^

iteration 2: 2 3 5 7 ...
      index:     ^
Run Code Online (Sandbox Code Playgroud)

这避免了重复的努力.此外,你不需要继续迭代,直到你到达max,我想你实际上可以在你过去时停止sqrt(max).


Dan*_*ton 5

我会做一些修改.

  • 您执行这似乎很奇怪filterPrimes所有数字之间2max / 2,"实际的"筛技术要求你只执行filterPrimes所有的素数之间的2sqrt(max).
  • 使用var和for循环看起来也很奇怪.要做到"功能"的方式,我会使用递归函数.
  • filterPrimes您可以随时收集素数,而不是在整个列表上执行; 不需要一遍又一遍地通过过滤器.
  • 这是相当奇怪的map,然后filter你的方式,因为地图只是标记要过滤的元素,你可以使用仅过滤器完成相同.

所以这是我第一次尝试这些修改:

def filterFactors(seed: Int, xs: List[Int]) = {
  xs.filter(x => x % seed != 0)
}

def sieve(max: Int) = {
  def go(xs: List[Int]) : List[Int] = xs match {
    case y :: ys => {
      if (y*y > max) y :: ys
      else y :: go(filterFactors(y, ys))
    }
    case Nil => Nil
  }

  go((2 to max).toList)
}
Run Code Online (Sandbox Code Playgroud)

然而,这反映了我的Haskell偏见,并且有一个巨大的缺陷:由于辅助函数中的递归调用y :: go(...),它将占用大量的堆栈空间go.运行sieve(1000000)导致我的"OutOfMemoryError".

让我们尝试一下常见的FP技巧:使用累加器进行尾递归.

def sieve(max: Int) = {
  def go(xs: List[Int],
         acc: List[Int])
         : List[Int] = xs match {
    case y :: ys => {
      if (y*y > max) acc.reverse ::: (y :: ys)
      else go(filterFactors(y, ys), y :: acc)
    }
    case Nil => Nil
  }

  go((2 to max).toList, Nil)
}
Run Code Online (Sandbox Code Playgroud)

通过添加累加器值,我们能够以go尾递归的形式编写辅助函数,从而避免了之前的巨大堆栈问题.(Haskell的评估策略非常不同;因此它既不需要也不会受益于尾递归)

现在让我们将速度与基于突变的命令式方法进行比较.

def mutationSieve (max: Int) = {
    var arr: Array[Option[Int]] =
        (2 to max).map (x => Some (x)).toArray
    var i = 0
    var seed = (arr (i)).get
    while (seed * seed < max) {
        for (j: Int <- (i + seed) to (max - 2) by seed) {
          arr (j) = None
        }
        i += 1
        while (arr (i).isEmpty) {
          i += 1
        }
        seed = (arr (i)).get
    }
    arr.flatten
}
Run Code Online (Sandbox Code Playgroud)

在这里,我使用an Array[Option[Int]],并通过用"None"替换其条目来"交叉"一个数字.有一点优化空间; 也许通过使用bool数组可以获得小的速度提升,其中索引代表特定的数字.随你.

使用非常原始的技术(精心布置的new Date()调用......)我将功能版本的基准测试比命令式版本慢约6倍.很明显,这两种方法具有相同的大时间复杂度,但是使用链表进行编程所涉及的常数因素确实会产生成本.

我还基准您的版本,使用Math.sqrt(max).ceil.toInt的不是max / 2:它是15倍左右,比我这里介绍的功能版本慢.有趣的是,估计1在1和1000(sqrt(1000000))之间的每7个数字中大约有1个是素数(1/ln(1000)),因此,减速的很大一部分可归因于你执行循环的事实在每一个数字上,我只为每个素数执行我的功能.当然,如果执行~1000次迭代需要15倍的时间,那么执行500000次迭代需要大约7500倍,这就是为什么你的原始代码令人痛苦地慢了.