用Haskell筛选素数

Mat*_*hid 5 primes haskell

好的,所以我正在尝试编写一个Haskell程序,它可以非常快速地计算素数.据推测,我不是第一个尝试这样做的人.(特别是,我确定我看到了一些现有技术,但我现在找不到它......)

最初,我想计算小于10 ^ 11的素数.目前我已经让我的程序运行了大约15分钟,它甚至还没到中途.一些狂热的C++程序员声称他的程序只需要8个 分钟.很明显,我正在做一些可怕的错误.

(如果重要的话,我当前的实现使用IOUArray Integer Bool多个线程来处理搜索空间的独立子范围.目前从10MB数组块中删除所有2的倍数需要几秒钟...)

请注意,10 ^ 11对于32位算术来说太大了.此外,10 ^ 11位= 12.5 GB,远远太多数据以适应Haskell的32位地址空间.所以你不能一次在内存中拥有整个位图.最后,请注意,小于10 ^ 11的素数只是小于2 ^ 32的阴影,因此您无法同时存储实际的整数.


编辑:显然我误读了时间信息.C++家伙实际声称的是:

  • 使用一个核心计算素数<10 ^ 11需要8分钟,使用4个核心需要56秒.(未指定CPU类型.)

  • 计数素数<10 ^ 10需要5秒.(不确定使用了多少个核心.)

抱歉错误...

编辑:我的源代码可以在这里找到:http://hpaste.org/72898

app*_*ive 13

使用arithmoi优秀StackOverflow教师D​​aniel Fischer 的软件包:

import Math.NumberTheory.Primes.Counting

main = print $ primeCount (10^11)

-- $ time ./arith
-- 4118054813
-- 
-- real 0m0.209s
-- user 0m0.198s
-- sys  0m0.008s
Run Code Online (Sandbox Code Playgroud)

这比你的'狂热'C++朋友写的快40倍; 也许他可以学习一两件看Haskell来源的东西......以下是黑线鳕


Dan*_*her 12

一些狂热的C++程序员声称他的程序只需要8秒钟.

那是挂钟时间还是CPU时间?

如果挂钟,并且任务分为100个CPU,比如说它不是很令人印象深刻(它很不错),如果分成1000个,那就太可怜了.

如果是CPU时间:

我很确定通过实际筛分达到10 11来达不到时间.

在此之前有几个超过4×10 9个素数,假设有一个正常的2-3GHz CPU,每个素数就有4-6个周期.

你不能用Eratosthenes筛子,也不能用阿特金筛子来实现.必须对每个填料进行检查和计数,每个复合物都标记为并检查.这给出了筛子中每个数字两个循环的理论下限,不计算例如阵列初始化,循环边界检查,循环变量更新,冗余标记.你不会接近理论界限.

一些数据点:

丹尼尔伯恩斯坦的素原(阿特金筛选),筛分块调整为充分利用我的32KB L1缓存,需要90秒才能将质数筛选到10 11并计算它们(234秒,默认筛块大小为8K在我的Core i5 2410M(2.3GHz)上.(它针对高达2 32的范围进行了大量优化,但高于此范围,它变得明显变慢,对于极限10 9,时间为0.49和0.64秒.)

我分段的Eratosthenes筛子,使用一些未暴露的内部构件以避免列表创建,在340秒内筛分并计数到10 11(嗅闻: - /,但是,嘿,10 9它花了2.92秒 - 它越来越近了,介于10之间12和10 13它取代了primegen :)使用暴露的接口创建一个素数列表大致加倍所花费的时间,就像用32位GHC编译它一样.

所以我打赌报告的8秒时间 - 如果是CPU时间 - 是否正确,算法计算素数的数量而不实际筛选整个方式.正如指出应用性的回答,可以做很多更快.

dafis@schwartz:~/Haskell/Repos/arithmoi> time tests/primeCount 100000000000
4118054813

real    0m0.145s
user    0m0.139s
sys     0m0.006s
Run Code Online (Sandbox Code Playgroud)

请注意,10 ^ 11对于32位算术来说太大了.此外,10 ^ 11位= 12.5 GB,太多的数据不适合Haskell的32位地址空间.所以你不能一次在内存中拥有整个位图.

要筛选该范围,您必须使用分段筛.即使您不受32位地址空间的限制,使用如此大的阵列也会因频繁的高速缓存未命中而产生极差的性能.您的程序将花费大部分时间等待从主内存传输的数据.筛选适合你的L2缓存的块(我没有成功通过使筛网适合L1来使其更快,我想GHC运行时的开销太大而无法使其工作).

此外,从筛子中消除一些小质数的倍数,这减少了所需的工作,并且通过使筛子更小来进一步提高性能.消除偶数是微不足道的,3倍的倍数,5的倍数不是很难.

最后,请注意,小于10 ^ 11的素数只是小于2 ^ 32的阴影,因此您无法同时存储实际的整数.

如果将筛子存储为位阵列表,删除了2,3和5的倍数,则需要大约3.3GB来存储块,所以如果你真的可以有4GB,那么它就适合了.但是你应该让你不再需要的块立即被垃圾收集.

(如果重要的话,我当前的实现使用IOUArray Integer Bool和多个线程来处理搜索空间的独立子范围.目前从10MB数组块中删除所有2的倍数需要几秒钟...)

这非常重要.

  • 使用Int的指数和unsafeRead/ unsafeWrite读取和修改数组.Integer计算比Int计算慢得多,并且你得到的边界检查readArray/ writeArray 真的很痛.
  • 10MB的块太大了,你丢失了缓存局部性.最多使用几百KB的块(L2缓存减去一些空间以用于其他所需的东西).
  • 尽管如此,即使使用Integer索引,边界检查和10MB块,也不应该花费几秒钟来删除2的倍数.我们能看到您的代码吗?

假期后更新:

在没有深刻魔法的情况下,八分钟可以筛选到10 11的素数.我不知道从一个到四个内核如何能够产生八倍的加速,因为这里应该没有缓存效果,但无论如何,如果没有看到代码,我无法调查.

那么让我们来看看你的代码.

首先,不正确:

vs <-
  mapM
    (\ start -> do
      let block = (start, start + block_size)
      v <- newEmptyMVar
      writeChan queue $ do
        say $ "New chunk " ++ show block
        chunk <- chunk_new block
        sieve_top base chunk
        c <- chunk_count chunk
        putMVar v c
      return v
    )
    (takeWhile (< target) $ iterate (+block_size) base_max)
Run Code Online (Sandbox Code Playgroud)

数字base_max + k*block_size每个都出现在两个块中,如果它们中的任何一个是素数,那么素数被计算两次,你也应该限制上限target.

现在到性能方面:

即跳出一件事是,它是真正的健谈,所以健谈,它是可衡量的,一旦你已经调整block_size到缓存中(我花了512KB二级缓存256KB块) -那么线程由战斗放缓stdoutif prime < 100 then say $ "Sieve " ++ show prime else return ()消息.

让我们来看看你的(沉默的)筛分循环:

chunk_sieve :: Chunk -> Integer -> IO ()
chunk_sieve array prime = do
  (min, max) <- getBounds array
  let n0 = min `mod` prime
  let n1 = if n0 == 0 then min else min - n0 + prime
  mapM_
    (\ n -> writeArray array n (n == prime))
    (takeWhile (<= max) $ iterate (+prime) n1)
Run Code Online (Sandbox Code Playgroud)

花费时间的一件事是将每个指数与标记掉其倍数的素数进行比较.每个单一的比较都很便宜(虽然比比较贵很多Int),但是大量的比较,只有其中一个比较可能会True增加.无条件地写入False并且如果必要True的话,在循环之后写入素数索引会产生相当大的加速.

出于计时目的,我已将目标降低到10 9并在两个核心上运行.原始代码花了155s(已过去,292s用户),减少了block_size148s,沉默了143s.省略比较,

mapM_
  (\ n -> writeArray array n False)
  (takeWhile (<= max) $ iterate (+prime) n1)
when (min <= prime && prime <= max) $ writeArray array prime True
Run Code Online (Sandbox Code Playgroud)

它运行在131s.

现在是时候进行一些更大的加速了.我是否已经提到边界检查花费了很多时间?由于循环条件保证不会尝试越界访问(并且质数足够小以至于不会Int发生溢出),我们应该使用未经检查的访问:

chunk_sieve :: Chunk -> Integer -> IO ()
chunk_sieve array prime = do
  (min, max) <- getBounds array
  let n0 = min `mod` prime
      n1 = if n0 == 0 then min else min - n0 + prime
      n2 = fromInteger (n1 - min)
      mx = fromInteger (max - min)
      pr = fromInteger prime
  mapM_
    (\ n -> unsafeWrite array n False)
    (takeWhile (<= mx) $ iterate (+pr) n2)
  when (min <= prime && prime <= max) $ writeArray array prime True
Run Code Online (Sandbox Code Playgroud)

这将运行时间缩短到96秒.好多了,但仍然很糟糕.罪魁祸首是

takeWhile (<= mx) $ iterate (+pr) n2
Run Code Online (Sandbox Code Playgroud)

GHC不能很好地融合这个组合,你得到一个Int遍历的盒装列表.用算术序列替换它,[n2, n2+pr .. mx]GHC愉快地使用未装箱的Int#s 创建一个循环,37秒.

好多了,但仍然很糟糕.现在最大的时间消费者是

chunk_count :: Chunk -> IO Integer
chunk_count array = do
    (min, max) <- getBounds array
    work min max 0
  where
    work i max count = do
      b <- readArray array i
      let count' = count + if b then 1 else 0
      evaluate count'
      let i' = i+1
      if i' > max
        then return count'
        else work i' max count'
Run Code Online (Sandbox Code Playgroud)

同样,边界检查花费了大量时间.同

chunk_count :: Chunk -> IO Integer
chunk_count array = do
    (min, max) <- getBounds array
    work 0 (fromInteger (max-min)) 0
  where
    work i max count = do
      b <- unsafeRead array i
      let count' = count + if b then 1 else 0
      evaluate count'
      let i' = i+1
      if i' > max
        then return count'
        else work i' max count'
Run Code Online (Sandbox Code Playgroud)

我们降到15秒.现在,这evaluate count'是一个有点昂贵的方式来work严格count.使用else work i' max $! count'的最后一行,而不是evaluate减少了运行时间13秒.定义work更合适(至少GHC)的方式,

chunk_count :: Chunk -> IO Integer
chunk_count array = do
    (min, max) <- getBounds array
    let mx = fromInteger (max-min)
        work i !ct
            | mx < i    = return ct
            | otherwise = do
                b <- unsafeRead array i
                work (i+1) (if b then ct+1 else ct)
    work 0 0
Run Code Online (Sandbox Code Playgroud)

将时间缩短到6.55秒.现在我们处于一个say $ "New chunk " ++ show block可衡量的差异的情况下,禁用使我们下降到6.18秒.

但是,通过从阵列中读取一个字节来计数设置位,屏蔽掉不需要的位并与每个单独的位进行比较并不是最有效的方法.Word从数组中读取整个s(通过castIOUArray)并使用popCount,如果"你知道你在做什么......",这会让我们降到4.25秒; 当素数的平方变得大于块的上限时停止标记

sieve_top :: Chunk -> Chunk -> IO ()
sieve_top base chunk = work 2
  where
    work prime = do
      chunk_sieve chunk prime
      mp <- chunk_next_prime base prime
      case mp of
        Nothing -> return ()
        Just p' -> do
            (_,mx) <- getBounds chunk
            when (p'*p' <= mx) $ work p'
Run Code Online (Sandbox Code Playgroud)

到3.9秒.仍然没有壮观,但考虑到我们开始的地方,还不错.只是为了说明缓存局部性在其他不良行为减少后的重要性:原始10MB块大小的相同代码需要8.5秒.

您的代码中的另一个小问题是所有线程都使用相同的可变小素数数组进行筛选.由于它是可变的,因此必须同步对它的访问,这会增加一些开销.只有两个线程,开销不是太大,使用不可变副本来进行筛选只会将时间减少到3.75秒,但我希望更多线程的效果会更大.(我只有两个物理内核 - 超线程 - 所以使用两个以上的线程进行相同类型的工作会引入减速,这可能会使得结果无效,但使用四个线程,我使用不可变数组获得4.55秒而5.3秒使用可变数组.这似乎证实了不断增长的同步开销.)

通过消除更多的Integer计算并为GHC的优化器编写代码(更多的工作者/包装器转换,一些静态参数转换),仍然可以获得一些,但不是很多,可能是10-15%.

通过从筛子中消除偶数来获得下一个重大改进.这将工作,分配和运行时间减少了一半以上.没有主要筛子应该考虑偶数,真的,这只是毫无意义的工作浪费.