IVl*_*lad 35 algorithm f# sieve-of-eratosthenes
我对纯粹功能性F#中的eratosthenes筛子的实施很感兴趣.我对实际筛子的实现感兴趣,而不是真正的筛子的天真功能实现,所以不是这样的:
let rec PseudoSieve list =
match list with
| hd::tl -> hd :: (PseudoSieve <| List.filter (fun x -> x % hd <> 0) tl)
| [] -> []
Run Code Online (Sandbox Code Playgroud)
上面的第二个链接简要描述了一个需要使用多图的算法,据我所知,这在F#中是不可用的.给出的Haskell实现使用了一个支持insertWith方法的映射,我在F#功能映射中没有看到它.
有没有人知道将给定的Haskell映射代码转换为F#的方法,或者可能知道替代实现方法或筛选算法哪些有效且更适合功能实现或F#?
gra*_*bot 16
阅读那篇文章后,我提出了一个不需要多图的想法.它通过一次又一次地将碰撞键向前移动其素值来处理碰撞地图键,直到它到达不在地图中的键.下面primes是一个映射,其中包含下一个迭代器值的键和值为素数的值.
let primes =
let rec nextPrime n p primes =
if primes |> Map.containsKey n then
nextPrime (n + p) p primes
else
primes.Add(n, p)
let rec prime n primes =
seq {
if primes |> Map.containsKey n then
let p = primes.Item n
yield! prime (n + 1) (nextPrime (n + p) p (primes.Remove n))
else
yield n
yield! prime (n + 1) (primes.Add(n * n, n))
}
prime 2 Map.empty
Run Code Online (Sandbox Code Playgroud)
下面是从优先级队列基于算法纸不使用方优化.我将通用优先级队列函数放在顶部.我用一个元组来表示惰性列表迭代器.
let primes() =
// the priority queue functions
let insert = Heap.Insert
let findMin = Heap.Min
let insertDeleteMin = Heap.DeleteInsert
// skips primes 2, 3, 5, 7
let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]
// increments iterator
let wheel (composite, n, prime) =
composite + wheelData.[n % 48] * prime, n + 1, prime
let insertPrime prime n table =
insert (prime * prime, n, prime) table
let rec adjust x (table : Heap) =
let composite, n, prime = findMin table
if composite <= x then
table
|> insertDeleteMin (wheel (composite, n, prime))
|> adjust x
else
table
let rec sieve iterator table =
seq {
let x, n, _ = iterator
let composite, _, _ = findMin table
if composite <= x then
yield! sieve (wheel iterator) (adjust x table)
else
if x = 13L then
yield! [2L; 3L; 5L; 7L; 11L]
yield x
yield! sieve (wheel iterator) (insertPrime x n table)
}
sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))
Run Code Online (Sandbox Code Playgroud)
这是基于优先级队列的算法和方形优化.为了便于延迟向查询表添加素数,必须将轮偏移与素数值一起返回.此版本的算法具有O(sqrt(n))内存使用,其中无优化的一个是O(n).
let rec primes2() : seq<int64 * int> =
// the priority queue functions
let insert = Heap.Insert
let findMin = Heap.Min
let insertDeleteMin = Heap.DeleteInsert
// increments iterator
let wheel (composite, n, prime) =
composite + wheelData.[n % 48] * prime, n + 1, prime
let insertPrime enumerator composite table =
// lazy initialize the enumerator
let enumerator =
if enumerator = null then
let enumerator = primes2().GetEnumerator()
enumerator.MoveNext() |> ignore
// skip primes that are a part of the wheel
while fst enumerator.Current < 11L do
enumerator.MoveNext() |> ignore
enumerator
else
enumerator
let prime = fst enumerator.Current
// Wait to insert primes until their square is less than the tables current min
if prime * prime < composite then
enumerator.MoveNext() |> ignore
let prime, n = enumerator.Current
enumerator, insert (prime * prime, n, prime) table
else
enumerator, table
let rec adjust x table =
let composite, n, prime = findMin table
if composite <= x then
table
|> insertDeleteMin (wheel (composite, n, prime))
|> adjust x
else
table
let rec sieve iterator (enumerator, table) =
seq {
let x, n, _ = iterator
let composite, _, _ = findMin table
if composite <= x then
yield! sieve (wheel iterator) (enumerator, adjust x table)
else
if x = 13L then
yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]
yield x, n
yield! sieve (wheel iterator) (insertPrime enumerator composite table)
}
sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))
Run Code Online (Sandbox Code Playgroud)
这是我的测试程序.
type GenericHeap<'T when 'T : comparison>(defaultValue : 'T) =
let mutable capacity = 1
let mutable values = Array.create capacity defaultValue
let mutable size = 0
let swap i n =
let temp = values.[i]
values.[i] <- values.[n]
values.[n] <- temp
let rec rollUp i =
if i > 0 then
let parent = (i - 1) / 2
if values.[i] < values.[parent] then
swap i parent
rollUp parent
let rec rollDown i =
let left, right = 2 * i + 1, 2 * i + 2
if right < size then
if values.[left] < values.[i] then
if values.[left] < values.[right] then
swap left i
rollDown left
else
swap right i
rollDown right
elif values.[right] < values.[i] then
swap right i
rollDown right
elif left < size then
if values.[left] < values.[i] then
swap left i
member this.insert (value : 'T) =
if size = capacity then
capacity <- capacity * 2
let newValues = Array.zeroCreate capacity
for i in 0 .. size - 1 do
newValues.[i] <- values.[i]
values <- newValues
values.[size] <- value
size <- size + 1
rollUp (size - 1)
member this.delete () =
values.[0] <- values.[size]
size <- size - 1
rollDown 0
member this.deleteInsert (value : 'T) =
values.[0] <- value
rollDown 0
member this.min () =
values.[0]
static member Insert (value : 'T) (heap : GenericHeap<'T>) =
heap.insert value
heap
static member DeleteInsert (value : 'T) (heap : GenericHeap<'T>) =
heap.deleteInsert value
heap
static member Min (heap : GenericHeap<'T>) =
heap.min()
type Heap = GenericHeap<int64 * int * int64>
let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]
let primes() =
// the priority queue functions
let insert = Heap.Insert
let findMin = Heap.Min
let insertDeleteMin = Heap.DeleteInsert
// increments iterator
let wheel (composite, n, prime) =
composite + wheelData.[n % 48] * prime, n + 1, prime
let insertPrime prime n table =
insert (prime * prime, n, prime) table
let rec adjust x (table : Heap) =
let composite, n, prime = findMin table
if composite <= x then
table
|> insertDeleteMin (wheel (composite, n, prime))
|> adjust x
else
table
let rec sieve iterator table =
seq {
let x, n, _ = iterator
let composite, _, _ = findMin table
if composite <= x then
yield! sieve (wheel iterator) (adjust x table)
else
if x = 13L then
yield! [2L; 3L; 5L; 7L; 11L]
yield x
yield! sieve (wheel iterator) (insertPrime x n table)
}
sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))
let rec primes2() : seq<int64 * int> =
// the priority queue functions
let insert = Heap.Insert
let findMin = Heap.Min
let insertDeleteMin = Heap.DeleteInsert
// increments iterator
let wheel (composite, n, prime) =
composite + wheelData.[n % 48] * prime, n + 1, prime
let insertPrime enumerator composite table =
// lazy initialize the enumerator
let enumerator =
if enumerator = null then
let enumerator = primes2().GetEnumerator()
enumerator.MoveNext() |> ignore
// skip primes that are a part of the wheel
while fst enumerator.Current < 11L do
enumerator.MoveNext() |> ignore
enumerator
else
enumerator
let prime = fst enumerator.Current
// Wait to insert primes until their square is less than the tables current min
if prime * prime < composite then
enumerator.MoveNext() |> ignore
let prime, n = enumerator.Current
enumerator, insert (prime * prime, n, prime) table
else
enumerator, table
let rec adjust x table =
let composite, n, prime = findMin table
if composite <= x then
table
|> insertDeleteMin (wheel (composite, n, prime))
|> adjust x
else
table
let rec sieve iterator (enumerator, table) =
seq {
let x, n, _ = iterator
let composite, _, _ = findMin table
if composite <= x then
yield! sieve (wheel iterator) (enumerator, adjust x table)
else
if x = 13L then
yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]
yield x, n
yield! sieve (wheel iterator) (insertPrime enumerator composite table)
}
sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))
let mutable i = 0
let compare a b =
i <- i + 1
if a = b then
true
else
printfn "%A %A %A" a b i
false
Seq.forall2 compare (Seq.take 50000 (primes())) (Seq.take 50000 (primes2() |> Seq.map fst))
|> printfn "%A"
primes2()
|> Seq.map fst
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"
primes2()
|> Seq.map fst
|> Seq.skip 999999
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"
System.Console.ReadLine() |> ignore
Run Code Online (Sandbox Code Playgroud)
Although there has been one answer giving an algorithm using a Priority Queue (PQ) as in a SkewBinomialHeap, it is perhaps not the right PQ for the job. What the incremental Sieve of Eratosthenes (iEoS) requires is a PQ that has excellent performance for getting the minimum value and reinserting values mostly slightly further down the queue but doesn't need the ultimate in performance for adding new values as iSoE only adds as new values a total of the primes up to the the square root of the range (which is a tiny fraction of the number of re-insertions that occur once per reduction). The SkewBinomialHeap PQ doesn't really give much more than using the built-in Map which uses a balanced binary search tree - 所有O(log n)操作 - 除了它稍微改变了操作的权重以支持SoE的要求.但是,SkewBinaryHeap每次还原仍需要很多O(log n)操作.
A PQ implemented as a Heap in more particular as a Binary Heap and even more particularly as a MinHeap pretty much satisfies iSoE's requirements with O(1) performance in getting the minimum and O(log n) performance for re-insertions and adding new entries, although the performance is actually a fraction of O(log n) as most of the re-insertions occur near the top of the queue and most of the additions of new values (which don't matter as they are infrequent) occur near the end of the queue where these operations are most efficient. In addition, the MinHeap PQ can efficiently implement the delete minimum and insert function in one (generally a fraction of) one O(log n) pass. Then, rather than for the Map (which is implemented as an AVL tree) where there is one O(log n) operation with generally a full 'log n' range due to the minimum value we require being at the far left last leaf of the tree, we are generally adding and removing the minimum at the root and inserting on the average of a few levels down in one pass. Thus the MinHeap PQ can be used with only one fraction of O(log n) operation per culling reduction rather than multiple larger fraction O(log n) operations.
The MinHeap PQ can be implemented with pure functional code (with no "removeMin" implemented as the iSoE doesn't require it but there is an "adjust" function for use in segmentation), as follows:
[<RequireQualifiedAccess>]
module MinHeap =
type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
[<CompilationRepresentation(CompilationRepresentationFlags.UseNullAsTrueValue)>]
[<NoEquality; NoComparison>]
type MinHeapTree<'T> =
| HeapEmpty
| HeapOne of MinHeapTreeEntry<'T>
| HeapNode of MinHeapTreeEntry<'T> * MinHeapTree<'T> * MinHeapTree<'T> * uint32
let empty = HeapEmpty
let getMin pq = match pq with | HeapOne(kv) | HeapNode(kv,_,_,_) -> Some kv | _ -> None
let insert k v pq =
let kv = MinHeapTreeEntry(k,v)
let rec insert' kv msk pq =
match pq with
| HeapEmpty -> HeapOne kv
| HeapOne kv2 -> if k < kv2.k then HeapNode(kv,pq,HeapEmpty,2u)
else let nn = HeapOne kv in HeapNode(kv2,nn,HeapEmpty,2u)
| HeapNode(kv2,l,r,cnt) ->
let nc = cnt + 1u
let nmsk = if msk <> 0u then msk <<< 1
else let s = int32 (System.Math.Log (float nc) / System.Math.Log(2.0))
(nc <<< (32 - s)) ||| 1u //never ever zero again with the or'ed 1
if k <= kv2.k then if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv,insert' kv2 nmsk l,r,nc)
else HeapNode(kv,l,insert' kv2 nmsk r,nc)
else if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv2,insert' kv nmsk l,r,nc)
else HeapNode(kv2,l,insert' kv nmsk r,nc)
insert' kv 0u pq
let private reheapify kv k pq =
let rec reheapify' pq =
match pq with
| HeapEmpty -> HeapEmpty //should never be taken
| HeapOne kvn -> HeapOne kv
| HeapNode(kvn,l,r,cnt) ->
match r with
| HeapOne kvr when k > kvr.k ->
match l with //never HeapEmpty
| HeapOne kvl when k > kvl.k -> //both qualify, choose least
if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
else HeapNode(kvl,HeapOne kv,r,cnt)
| HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
else HeapNode(kvl,reheapify' l,r,cnt)
| _ -> HeapNode(kvr,l,HeapOne kv,cnt) //only right qualifies
| HeapNode(kvr,_,_,_) when k > kvr.k -> //need adjusting for left leaf or else left leaf
match l with //never HeapEmpty or HeapOne
| HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
if kvl.k > kvr.k then HeapNode(kvr,l,reheapify' r,cnt)
else HeapNode(kvl,reheapify' l,r,cnt)
| _ -> HeapNode(kvr,l,reheapify' r,cnt) //only right qualifies
| _ -> match l with //r could be HeapEmpty but l never HeapEmpty
| HeapOne(kvl) when k > kvl.k -> HeapNode(kvl,HeapOne kv,r,cnt)
| HeapNode(kvl,_,_,_) when k > kvl.k -> HeapNode(kvl,reheapify' l,r,cnt)
| _ -> HeapNode(kv,l,r,cnt) //just replace the contents of pq node with sub leaves the same
reheapify' pq
let reinsertMinAs k v pq =
let kv = MinHeapTreeEntry(k,v)
reheapify kv k pq
let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then rebuild by reheapify
let rec adjust' pq =
match pq with
| HeapEmpty -> pq
| HeapOne kv -> HeapOne(MinHeapTreeEntry(f kv.k kv.v))
| HeapNode (kv,l,r,cnt) -> let nkv = MinHeapTreeEntry(f kv.k kv.v)
reheapify nkv nkv.k (HeapNode(kv,adjust' l,adjust' r,cnt))
adjust' pq
Run Code Online (Sandbox Code Playgroud)
Using the above module, the iSoE can be written with the wheel factorization optimizations and using efficient Co-Inductive Streams (CIS's) as follows:
type CIS<'T> = class val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQWSE() =
let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
let WHLPTRN =
let wp = Array.zeroCreate (WHLLMT+1)
let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
{0..WHLCRC-1} |> Seq.fold (fun s i->
let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
then 1 else 0) |> gaps;wp
let inline whladv i = if i < WHLLMT then i + 1 else 0 in let advcnd c i = c + uint32 WHLPTRN.[i]
let inline culladv c p i = let n = c + uint32 WHLPTRN.[i] * p in if n < c then 0xFFFFFFFFu else n
let rec mkprm (n,wi,pq,(bps:CIS<_>),q) =
let nxt = advcnd n wi in let nxti = whladv wi
if nxt < n then (0u,0,(0xFFFFFFFFu,0,MinHeap.empty,bps,q))
elif n>=q then let bp,bpi = bps.v in let nc,nci = culladv n bp bpi,whladv bpi
let nsd = bps.cont() in let np,_ = nsd.v in let sqr = if np>65535u then 0xFFFFFFFFu else np*np
mkprm (nxt,nxti,(MinHeap.insert nc (cullstate(bp,nci)) pq),nsd,sqr)
else match MinHeap.getMin pq with | None -> (n,wi,(nxt,nxti,pq,bps,q))
| Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q)
elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q)
else (n,wi,(nxt,nxti,pq,bps,q))
let rec pCID p pi pq bps q = CIS((p,pi),fun()->let (np,npi,(nxt,nxti,npq,nbps,nq))=mkprm (advcnd p pi,whladv pi,pq,bps,q)
pCID np npi npq nbps nq)
let rec baseprimes() = CIS((FSTPRM,0),fun()->let np=FSTPRM+uint32 WHLPTRN.[0]
pCID np (whladv 0) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
let genseq sd = Seq.unfold (fun (p,pi,pcc) ->if p=0u then None else Some(p,mkprm pcc)) sd
seq { yield! WHLPRMS; yield! mkprm (FSTPRM,0,MinHeap.empty,baseprimes(),(FSTPRM*FSTPRM)) |> genseq }
Run Code Online (Sandbox Code Playgroud)
The above code calculates the first 100,000 primes in about 0.077 seconds, the first 1,000,000 primes in 0.977 seconds, the first 10,000,000 primes in about 14.33 seconds, and the first 100,000,000 primes in about 221.87 seconds, all on an i7-2700K (3.5GHz) as 64-bit code. This purely functional code is slightly faster than that of Dustin Cambell's mutable Dictionary based code with the added common optimizations of wheel factorization, deferred adding of base primes, and use of the more efficient CID's all added (tryfsharp and ideone) but is still pure functional code where his using the Dictionary class is not. However, for larger prime ranges of about of about two billion (about 100 million primes), the code using the hash table based Dictionary will be faster as the Dictionary operations do not have a O(log n) factor and this gain overcomes the computational complexity of using Dictionary hash tables.
The above program has the further feature that the factorization wheel is parameterized so that, for instance, one can use a extremely large wheel by setting WHLPRMS to [| 2u;3u;5u;7u;11u;13u;17u;19u |] and FSTPRM to 23u to get a run time of about two thirds for large ranges at about 9.34 seconds for ten million primes, although note that it takes several seconds to compute the WHLPTRN before the program starts to run, which is a constant overhead no matter the prime range.
Comparative Analysis: As compared to the pure functional incremental tree folding implementation, this algorithm is just slightly faster because the average used height of the MinHeap tree is less by a factor of two than the depth of the folded tree but that is offset by an equivalent constant factor loss in efficiency in ability to traverse the PQ tree levels due to it being based on a binary heap requiring processing of both the right and left leaves for every heap level and a branch either way rather than a single comparison per level for the tree folding with generally the less deep branch the taken one. As compared to other PQ and Map based functional algorithms, improvements are generally by a constant factor in reducing the number of O(log n) operations in traversing each level of the respective tree structures.
The MinHeap is usually implemented as a mutable array binary heap after a genealogical tree based model invented by Michael Eytzinger over 400 years ago. I know the question said there was no interest in non-functional mutable code, but if one must avoid all sub code that uses mutability, then we couldn't use list's or LazyList's which use mutability "under the covers" for performance reasons. So imagine that the following alternate mutable version of the MinHeap PQ is as supplied by a library and enjoy another factor of over two for larger prime ranges in performance:
[<RequireQualifiedAccess>]
module MinHeap =
type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
type MinHeapTree<'T> = ResizeArray<MinHeapTreeEntry<'T>>
let empty<'T> = MinHeapTree<MinHeapTreeEntry<'T>>()
let getMin (pq:MinHeapTree<_>) = if pq.Count > 0 then Some pq.[0] else None
let insert k v (pq:MinHeapTree<_>) =
if pq.Count = 0 then pq.Add(MinHeapTreeEntry(0xFFFFFFFFu,v)) //add an extra entry so there's always a right max node
let mutable nxtlvl = pq.Count in let mutable lvl = nxtlvl <<< 1 //1 past index of value added times 2
pq.Add(pq.[nxtlvl - 1]) //copy bottom entry then do bubble up while less than next level up
while ((lvl <- lvl >>> 1); nxtlvl <- nxtlvl >>> 1; nxtlvl <> 0) do
let t = pq.[nxtlvl - 1] in if t.k > k then pq.[lvl - 1] <- t else lvl <- lvl <<< 1; nxtlvl <- 0 //causes loop break
pq.[lvl - 1] <- MinHeapTreeEntry(k,v); pq
let reinsertMinAs k v (pq:MinHeapTree<_>) = //do minify down for value to insert
let mutable nxtlvl = 1 in let mutable lvl = nxtlvl in let cnt = pq.Count
while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
pq.[lvl - 1] <- MinHeapTreeEntry(k,v); pq
let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then re-heapify
if pq <> null then
let cnt = pq.Count
if cnt > 1 then
for i = 0 to cnt - 2 do //change contents using function
let e = pq.[i] in let k,v = e.k,e.v in pq.[i] <- MinHeapTreeEntry (f k v)
for i = cnt/2 downto 1 do //rebuild by reheapify
let kv = pq.[i - 1] in let k = kv.k
let mutable nxtlvl = i in let mutable lvl = nxtlvl
while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
pq.[lvl - 1] <- kv
pq
Run Code Online (Sandbox Code Playgroud)
Geek note: I had actually expected the mutable version to offer a much better improved performance ratio, but it bogs down in the re-insertions due to the nested if-then-else code structure and the random behavior of the prime cull values meaning that the CPU branch prediction fails for a large proportion of the branches resulting in many additional 10's of CPU clock cycles per cull reduction to rebuilt the instruction pre-fetch cache.
The only other constant factor performance gains on this algorithm would be segmentation and use of multi-tasking for a performance gain proportional to the number of CPU cores; however, as it stands, this is the fastest pure functional SoE algorithm to date, and even the pure functional form using the functional MinHeap beats simplistic imperative implementations such as Jon Harrop's code or Johan Kullbom's Sieve of Atkin (which is in error in his timing as he only calculated the primes to 10 million rather than the 10 millionth prime), but those algorithms would be about five times faster if better optimizations were used. That ratio of about five between functional and imperative code will be somewhat reduced when we add multi-threading of larger wheel factorization as the computational complexity of the imperative code increases faster than the functional code and multi-threading helps the slower functional code more than the faster imperative code as the latter gets closer to the base limit of the time required to enumerate through the found primes.
EDIT_ADD: Even though one could elect to continue to use the pure functional version of MinHeap, adding efficient segmentation in preparation for multi-threading would slightly "break" the "pureness" of the functional code as follows: 1) The most efficient way of transferring a representation of composite-culled primes is a packed-bit array the size of the segment, 2) While the size of the array is known, using an array comprehension to initialize it in a functional way isn't efficient as it uses "ResizeArray" under the covers which needs to copy itself for every x additions (I think 'x' is eight for the current implementation) and using Array.init doesn't work as many values at particular indexes are skipped, 3) Therefore, the easiest way to fill the culled-composite array is to zeroCreate it of the correct size and then run an initialization function which could write to each mutable array index no more than once. Although this isn't strictly "functional", it is close in that the array is initialized and then never modified again.
The code with added segmentation, multi-threading, programmable wheel factorial circumference, and many performance tweaks is as follows (other than some added new constants, the extra tuned code to implement the segmentation and multi-threading is the bottom approximately half of the code starting at the "prmspg" function):
type prmsCIS = class val pg:uint16 val bg:uint16 val pi:int val cont:unit->prmsCIS
new(pg,bg,pi,nxtprmf) = { pg=pg;bg=bg;pi=pi;cont=nxtprmf } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQOWSE() =
let WHLPRMS = [| 2u;3u;5u;7u;11u;13u;17u |] in let FSTPRM = 19u in let WHLCRC = int(WHLPRMS |> Seq.fold (*) 1u)
let MXSTP = uint64(FSTPRM-1u) in let BFSZ = 1<<<11 in let NUMPRCS = System.Environment.ProcessorCount
let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1 in let WHLPTRN = Array.zeroCreate (WHLLMT+1)
let WHLRNDUP = let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1)
else acc in let b = a |> Array.scan (+) 0
Array.init (WHLCRC>>>1) (fun i->
if a.[i]=0 then 0 else let g=2*gap (i+1) 1 in WHLPTRN.[b.[i]]<-byte g;1)
Array.init WHLCRC (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u) then 1 else 0)
|> gaps |> Array.scan (+) 0
let WHLPOS = WHLPTRN |> Array.map (uint32) |> Array.scan (+) 0u in let advcnd cnd cndi = cnd + uint32 WHLPTRN.[cndi]
let MINRNGSTP = if WHLLMT<=31 then uint32(32/(WHLLMT+1)*WHLCRC) else if WHLLMT=47 then uint32 WHLCRC<<<1 else uint32 WHLCRC
let MINBFRNG = uint32((BFSZ<<<3)/(WHLLMT+1)*WHLCRC)/MINRNGSTP*MINRNGSTP
let MINBFRNG = if MINBFRNG=0u then MINRNGSTP else MINBFRNG
let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline culladv c p i = c+uint32 WHLPTRN.[i]*p
let rec mkprm (n,wi,pq,(bps:prmsCIS),q,lstp,bgap) =
let nxt,nxti = advcnd n wi,whladv wi
if n>=q then let p = (uint32 bps.bg<<<16)+uint32 bps.pg
let nbps,nxtcmpst,npi = bps.cont(),culladv n p bps.pi,whladv bps.pi
let pg = uint32 nbps.pg in let np = p+pg in let sqr = q+pg*((p<<<1)+pg) //only works to p < about 13 million
let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont) //therefore, algorithm only works to p^2 or about
mkprm (nxt,nxti,(MinHeap.insert nxtcmpst (cullstate(p,npi)) pq),nbps,sqr,lstp,(bgap+1us)) //1.7 * 10^14
else match MinHeap.getMin pq with
| None -> (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us)) //fix with q is uint64
| Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,bgap)
elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,(bgap+1us))
else (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us))
let rec pCIS p pg bg pi pq bps q = prmsCIS(pg,bg,pi,fun()->
let (npg,nbg,npi,(nxt,nxti,npq,nbps,nq,nl,ng))=mkprm (p+uint32 WHLPTRN.[pi],whladv pi,pq,bps,q,p,0us)
pCIS (p+uint32 npg) npg nbg npi npq nbps nq)
let rec baseprimes() = prmsCIS(uint16 FSTPRM,0us,0,fun()->
let np,npi=advcnd FSTPRM 0,whladv 0
pCIS np (uint16 WHLPTRN.[0]) 1us npi MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
let prmspg nxt adj pq bp q =
//compute next buffer size rounded up to next even wheel circle so at least one each base prime hits the page
let rng = max (((uint32(MXSTP+uint64(sqrt (float (MXSTP*(MXSTP+4UL*nxt))))+1UL)>>>1)+MINRNGSTP)/MINRNGSTP*MINRNGSTP) MINBFRNG
let nxtp() = async {
let rec addprms pqx (bpx:prmsCIS) qx =
if qx>=adj then pqx,bpx,qx //add primes to queue for new lower limit
else let p = (uint32 bpx.bg<<<16)+uint32 bpx.pg in let nbps = bpx.cont()
let pg = uint32 nbps.pg in let np = p+pg in let sqr = qx+pg*((p<<<1)+pg)
let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont)
addprms (MinHeap.insert qx (cullstate(p,bpx.pi)) pqx) nbps sqr
let adjcinpg low k (v:cullstate) = //adjust the cull states for the new page low value
let p = v.p in let WHLSPN = int64 WHLCRC*int64 p in let db = int64 p*int64 WHLPOS.[v.wi]
let db = if k<low then let nk = int64(low-k)+db in nk-((nk/WHLSPN)*WHLSPN)
else let nk = int64(k-low) in if db<nk then db+WHLSPN-nk else db-nk
let r = WHLRNDUP.[int((((db>>>1)%(WHLSPN>>>1))+int64 p-1L)/int64 p)] in let x = int64 WHLPOS.[r]*int64 p
let r = if r>WHLLMT then 0 else r in let x = if x<db then x+WHLSPN-db else x-db in uint32 x,cullstate(p,r)
let bfbtsz = int rng/WHLCRC*(WHLLMT+1) in let nbuf = Array.zeroCreate (bfbtsz>>>5)
let rec nxtp' wi cnt = let _,nbg,_,ncnt = mkprm cnt in let nwi = wi + int nbg
if nwi < bfbtsz then nbuf.[nwi>>>5] <- nbuf.[nwi>>>5] ||| (1u<<<(nwi&&&0x1F)); nxtp' nwi ncnt
else let _,_,pq,bp,q,_,_ = ncnt in nbuf,pq,bp,q //results incl buf and cont parms for next page
let npq,nbp,nq = addprms pq bp q
return nxtp' 0 (0u,0,MinHeap.adjust (adjcinpg adj) npq,nbp,nq-adj,0u,0us) }
rng,nxtp() |> Async.StartAsTask
let nxtpg nxt (cont:(_*System.Threading.Tasks.Task<_>)[]) = //(len,pq,bp,q) =
let adj = (cont |> Seq.fold (fun s (r,_) -> s+r) 0u)
let _,tsk = cont.[0] in let _,pq,bp,q = tsk.Result
let ncont = Array.init (NUMPRCS+1) (fun i -> if i<NUMPRCS then cont.[i+1]
else prmspg (nxt+uint64 adj) adj pq bp q)
let _,tsk = ncont.[0] in let nbuf,_,_,_ = tsk.Result in nbuf,ncont
//init cond buf[0], no queue, frst bp sqr offset
let initcond = 0u,System.Threading.Tasks.Task.Factory.StartNew (fun()->
(Array.empty,MinHeap.empty,baseprimes(),FSTPRM*FSTPRM-FSTPRM))
let nxtcond n = prmspg (uint64 n) (n-FSTPRM) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM-FSTPRM)
let initcont = Seq.unfold (fun (n,((r,_)as v))->Some(v,(n+r,nxtcond (n+r)))) (FSTPRM,initcond)
|> Seq.take (NUMPRCS+1) |> Seq.toArray
let rec nxtprm (c,ci,i,buf:uint32[],cont) =
let rec nxtprm' c ci i =
let nc = c + uint64 WHLPTRN.[ci] in let nci = whladv ci in let ni = i + 1 in let nw = ni>>>5
if nw >= buf.Length then let (npg,ncont)=nxtpg nc cont in nxtprm (c,ci,-1,npg,ncont)
elif (buf.[nw] &&& (1u <<< (ni &&& 0x1F))) = 0u then nxtprm' nc nci ni
else nc,nci,ni,buf,cont
nxtprm' c ci i
seq { yield! WHLPRMS |> Seq.map (uint64);
yield! Seq.unfold (fun ((c,_,_,_,_) as cont)->Some(c,nxtprm cont))
(nxtprm (uint64 FSTPRM-uint64 WHLPTRN.[WHLLMT],WHLLMT,-1,Array.empty,initcont)) }
Run Code Online (Sandbox Code Playgroud)
Note that the MinHeap modules, both functional and array-based, have had an "adjust" function added to permit adjusting of the cull state of each thread's version of the PQ at the beginning of every new segment page. Also note that it was possible to adjust the code so that most of the computation is done using 32 bit ranges with the final sequence output as uint64's at little cost in computational time so that currently the theoretical range is something over 100 trillion (ten raised to the fourteen power) if one were willing to wait the about three to four months required to compute that range. The numeric range checks were removed as it is unlikely that anyone would use this algorithm to compute up to that range let alone past it.
Using the pure functional MinHeap and 2,3,5,7 wheel factorization, the above program computes the first hundred thousand, one million, ten million, and a hundred million primes in 0.062, 0.629, 10.53, and 195.62 seconds, respectively. Using the array-based MinHeap speeds this up to 0.097, 0.276, 3.48, and 51.60 seconds, respectively. Using the 2,3,5,7,11,13,17 wheel by changing WHLPRMS to "[| 2u;3u;5u;7u;11u;13u;17u |]" and FSTPRM to 19u speeds that up yet a little more to 0.181, 0.308, 2.49, and 36.58 seconds, respectively (for constant factor improvement with a constant overhead). This fastest tweak calculates the 203,280,221 primes in the 32-bit number range in about 88.37 seconds. The "BFSZ" constant can be adjusted with trade-offs between slower times for smaller ranges version faster times for larger ranges, with a value of "1<<<14" recommended to be tried for the larger ranges. This constant only sets the minimum buffer size, with the program adjusting the buffer size above that size automatically for larger ranges such that the buffer is sufficient so that the largest base prime required for the page range will always "strike" each page at least once; this means that the complexity and overhead of an additional "bucket sieve" is not required. This last fully optimized version can compute the primes up to 10 and 100 billion in about 256.8 and 3617.4 seconds (just over an hour for the 100 billion) as tested using "primesPQOWSE() |> Seq.takeWhile ((>=)100000000000UL) |> Seq.fold (fun s p -> s + 1UL) 0UL" for output. This is where the estimates of about half a day for the count of primes to a trillion, a week for up to ten trillion and about three to four months for up to a hundred trillion come from.
I don't think it's possible to make functional or almost functional code using the incremental SoE algorithm to run much faster than this. As one can see in looking at the code, optimizing the basic incremental algorithm has added greatly to the code complexity such that it is likely slightly more complex than equivalently optimized code based on straight array culling with that code able to run approximately ten times faster than this and without the extra exponent in the performance meaning that this functional incremental code has an ever increasing extra percentage overhead.
So is this useful other than from an interesting theoretical and intellectual viewpoint? Probably it's not. For smaller ranges of primes up to about ten million, the best of the basic not fully optimized incremental functional SoE's are probably adequate and quite simple to write or have less RAM memory use than the simplest imperative SoE's. However, they are much slower than more imperative code using an array so they "run out of steam" for ranges above that. While it has been demonstrated here that the code can be sped up by optimization, it is still 10's of times slower than a more imperative pure array-based version yet has added to the complexity to be at least as complex as that code with equivalent optimizations, and even that code under F# on DotNet is about four times slower than using a language such as C++ compiled directly to native code; if one really wanted to investigate large ranges of primes, one would likely use one of those other languages and techniques where primesieve can calculate the number of primes in the hundred trillion range in under four hours instead of the about three months required for this code. END_EDIT_ADD
这里使用序列对基于Eratosthenes的Sieve的算法增量(和递归)映射进行了极大优化,因为不需要对先前序列值进行记忆(除了使用Seq缓存基本素数值有一点点优势.缓存),主要优化是它对输入序列使用轮分解,并且它使用多个(递归)流来维护小于最近被筛选的数字的平方根的基本素数,如下所示:
let primesMPWSE =
let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
let adv i = if i < 47 then i + 1 else 0
let reinsert oldcmpst mp (prime,pi) =
let cmpst = oldcmpst + whlptrn.[pi] * prime
match Map.tryFind cmpst mp with
| None -> mp |> Map.add cmpst [(prime,adv pi)]
| Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
let rec mkprimes (n,i) m ps q =
let nxt = n + whlptrn.[i]
match Map.tryFind n m with
| None -> if n < q then seq { yield (n,i); yield! mkprimes (nxt,adv i) m ps q }
else let (np,npi),nlst = Seq.head ps,ps |> Seq.skip 1
let (nhd,ni),nxtcmpst = Seq.head nlst,n + whlptrn.[npi] * np
mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nlst (nhd * nhd)
| Some(skips) -> let adjmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
mkprimes (nxt,adv i) adjmap ps q
let rec prs = seq {yield (11,0); yield! mkprimes (13,1) Map.empty prs 121 } |> Seq.cache
seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> Seq.map (fun (p,i) -> p) }
Run Code Online (Sandbox Code Playgroud)
它在大约0.445秒内发现第100,000个素数高达1,299,721,但不是一个适当的势在必行的EoS算法,它不能随着质数增加而线性增加,需要7.775秒才能找到1,000,000个素数达到15,485,867的表现在大约O(n ^ 1.2)的范围内,其中n是找到的最大素数.
可以进行更多的调整,但对于更好的性能,可能不会产生很大的差异,如下所示:
由于F#序列库明显很慢,可以使用实现IEnumerable的自定义类型来减少内部序列所花费的时间,但由于序列操作仅占总时间的20%,即使这些也减少到零时间结果只会减少到80%的时间.
可以尝试其他形式的地图存储,例如O'Neil提到的优先级队列或@gradbot使用的SkewBinomialHeap,但至少对于SkewBinomialHeap,性能的提升只有几个百分点.在选择不同的地图实现时,似乎只是在查找和删除列表开头附近的项目时更好的响应与插入新条目所花费的时间以获得这些好处,因此净收益几乎是一个洗并且在地图中增加条目时仍具有O(log n)性能.使用多个条目流仅仅到平方根的上述优化减少了映射中的条目数,因此使这些改进不太重要.
EDIT_ADD: 我确实做了一点额外的优化,性能确实比预期有所改善,可能是由于改进了消除Seq.skip作为推进基本素数序列的方法.该优化使用内部序列生成的替换作为整数值的元组和用于前进到序列中的下一个值的延续函数,其中最终的F#序列由整体展开操作生成.代码如下:
type SeqDesc<'a> = SeqDesc of 'a * (unit -> SeqDesc<'a>) //a self referring tuple type
let primesMPWSE =
let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
let inline adv i = if i < 47 then i + 1 else 0
let reinsert oldcmpst mp (prime,pi) =
let cmpst = oldcmpst + whlptrn.[pi] * prime
match Map.tryFind cmpst mp with
| None -> mp |> Map.add cmpst [(prime,adv pi)]
| Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
let rec mkprimes (n,i) m (SeqDesc((np,npi),nsdf) as psd) q =
let nxt = n + whlptrn.[i]
match Map.tryFind n m with
| None -> if n < q then SeqDesc((n,i),fun() -> mkprimes (nxt,adv i) m psd q)
else let (SeqDesc((nhd,x),ntl) as nsd),nxtcmpst = nsdf(),n + whlptrn.[npi] * np
mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nsd (nhd * nhd)
| Some(skips) -> let adjdmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
mkprimes (nxt,adv i) adjdmap psd q
let rec prs = SeqDesc((11,0),fun() -> mkprimes (13,1) Map.empty prs 121 )
let genseq sd = Seq.unfold (fun (SeqDesc((n,i),tailfunc)) -> Some(n,tailfunc())) sd
seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> genseq }
Run Code Online (Sandbox Code Playgroud)
找到第100,000和第1,000,000个质数所需的时间分别约为0.31和5.1秒,因此这一小变化可获得相当大的百分比增益.我确实尝试了我自己的IEnumerable/IEnumerator接口的实现,这些接口是序列的基础,尽管它们比Seq模块使用的版本更快,但它们几乎没有对这个算法做出任何进一步的改变,因为大部分时间花费在地图功能. END_EDIT_ADD
除了基于地图的增量EoS实现之外,还有另一个使用Tree Folding的"纯功能"实现,据说它稍快一点,但由于它在树折叠中仍然有一个O(log n)项,我怀疑它主要是由于与使用地图相比,算法如何实现计算机操作的数量,因此更快(如果是).如果人们有兴趣,我也会开发该版本.
最后,我们必须接受增量EoS的纯函数实现不会接近于较大数值范围的良好命令式实现的原始处理速度.然而,人们可以想出一种方法,其中所有代码都是纯函数的,除了使用(可变)数组在一个范围内对复合数字进行分段筛分,这将接近O(n)性能并且在实际使用中将是50比大范围的功能算法快一百倍,例如前200,000,000个素数.这是由@Jon Harrop在他的博客中完成的,但这可以通过很少的额外代码进一步调整.
这是我尝试将Haskell代码合理忠实地翻译为F#:
#r "FSharp.PowerPack"
module Map =
let insertWith f k v m =
let v = if Map.containsKey k m then f m.[k] v else v
Map.add k v m
let sieve =
let rec sieve' map = function
| LazyList.Nil -> Seq.empty
| LazyList.Cons(x,xs) ->
if Map.containsKey x map then
let facts = map.[x]
let map = Map.remove x map
let reinsert m p = Map.insertWith (@) (x+p) [p] m
sieve' (List.fold reinsert map facts) xs
else
seq {
yield x
yield! sieve' (Map.add (x*x) [x] map) xs
}
fun s -> sieve' Map.empty (LazyList.ofSeq s)
let rec upFrom i =
seq {
yield i
yield! upFrom (i+1)
}
let primes = sieve (upFrom 2)
Run Code Online (Sandbox Code Playgroud)
使用邮箱处理器实现Prime筛:
let (<--) (mb : MailboxProcessor<'a>) (message : 'a) = mb.Post(message)
let (<-->) (mb : MailboxProcessor<'a>) (f : AsyncReplyChannel<'b> -> 'a) = mb.PostAndAsyncReply f
type 'a seqMsg =
| Next of AsyncReplyChannel<'a>
type PrimeSieve() =
let counter(init) =
MailboxProcessor.Start(fun inbox ->
let rec loop n =
async { let! msg = inbox.Receive()
match msg with
| Next(reply) ->
reply.Reply(n)
return! loop(n + 1) }
loop init)
let filter(c : MailboxProcessor<'a seqMsg>, pred) =
MailboxProcessor.Start(fun inbox ->
let rec loop() =
async {
let! msg = inbox.Receive()
match msg with
| Next(reply) ->
let rec filter prime =
if pred prime then async { return prime }
else async {
let! next = c <--> Next
return! filter next }
let! next = c <--> Next
let! prime = filter next
reply.Reply(prime)
return! loop()
}
loop()
)
let processor = MailboxProcessor.Start(fun inbox ->
let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =
async {
let! msg = inbox.Receive()
match msg with
| Next(reply) ->
reply.Reply(prime)
let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))
let! newPrime = oldFilter <--> Next
return! loop newFilter newPrime
}
loop (counter(3)) 2)
member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)
static member upto max =
let p = PrimeSieve()
Seq.initInfinite (fun _ -> p.Next())
|> Seq.takeWhile (fun prime -> prime <= max)
|> Seq.toList
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4799 次 |
| 最近记录: |