ste*_*red 9 primes iterator scala stream out-of-memory
我写了一个函数,使用流无限期地生成质数(维基百科:Erastothenes的增量筛).它返回一个流,但它也在内部合并素数倍流以标记即将到来的复合.如果我自己这样说,这个定义简洁,实用,优雅且易于理解:
def primes(): Stream[Int] = {
def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = {
def next = a.head min b.head
Stream.cons(next, merge(if (a.head == next) a.tail else a,
if (b.head == next) b.tail else b))
}
def test(n: Int, compositeStream: Stream[Int]): Stream[Int] = {
if (n == compositeStream.head) test(n+1, compositeStream.tail)
else Stream.cons(n, test(n+1, merge(compositeStream, Stream.from(n*n, n))))
}
test(2, Stream.from(4, 2))
}
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试生成第1000个素数时,我得到了"java.lang.OutOfMemoryError:超出GC开销限制".
我有一个替代解决方案,它在primes上返回一个迭代器,并在内部使用元组的优先级队列(multiple,prime用于生成多个)来标记即将到来的复合.它运行良好,但它需要大约两倍的代码,我基本上不得不从头重新开始:
import scala.collection.mutable.PriorityQueue
def primes(): Iterator[Int] = {
// Tuple (composite, prime) is used to generate a primes multiples
object CompositeGeneratorOrdering extends Ordering[(Long, Int)] {
def compare(a: (Long, Int), b: (Long, Int)) = b._1 compare a._1
}
var n = 2;
val composites = PriorityQueue(((n*n).toLong, n))(CompositeGeneratorOrdering)
def advance = {
while (n == composites.head._1) { // n is composite
while (n == composites.head._1) { // duplicate composites
val (multiple, prime) = composites.dequeue
composites.enqueue((multiple + prime, prime))
}
n += 1
}
assert(n < composites.head._1)
val prime = n
n += 1
composites.enqueue((prime.toLong * prime.toLong, prime))
prime
}
Iterator.continually(advance)
}
Run Code Online (Sandbox Code Playgroud)
有没有一种直接的方法将带有流的代码转换为带有迭代器的代码?或者是否有一种简单的方法可以让我的第一次尝试更有效率?
从流的角度思考会更容易; 我宁愿那样开始,然后在必要时调整我的代码.
我想这是当前Stream实施中的一个错误.
primes().drop(999).head 工作良好:
primes().drop(999).head
// Int = 7919
Run Code Online (Sandbox Code Playgroud)
你将得到如下OutOfMemoryError存储Stream:
val prs = primes()
prs.drop(999).head
// Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)
这里有类Cons 实现的问题:它不仅包含计算tail,还包含计算此函数的函数tail.即使tail计算完毕,也不再需要功能!
在这种情况下,功能非常繁重,因此OutOfMemoryError即使存储了1000个函数也是如此.
我们不得不以某种方式放弃这些功能.
直观修复失败:
val prs = primes().iterator.toStream
prs.drop(999).head
// Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
Run Code Online (Sandbox Code Playgroud)
随着iterator上Stream你会得到StreamIterator与StreamIterator#toStream你会得到初始重Stream.
所以我们必须手动转换它:
def toNewStream[T](i: Iterator[T]): Stream[T] =
if (i.hasNext) Stream.cons(i.next, toNewStream(i))
else Stream.empty
val prs = toNewStream(primes().iterator)
// Stream[Int] = Stream(2, ?)
prs.drop(999).head
// Int = 7919
Run Code Online (Sandbox Code Playgroud)
在你的第一个代码中,你应该推迟合并,直到候选人中看到素数的平方.这将大大减少正在使用的流的数量,从根本上改善您的内存使用问题.为了获得第1000个素数,7919,我们只需要考虑不高于其平方根的素数,88.这只是 他们的倍数的23个素数/流,而不是999(22,如果我们从一开始就忽略了均衡).对于第10,000个素数,它是9999个倍数流和66个流之间的区别.而对于第100,000个,只需要189个.
诀窍是通过递归调用将正在消耗的素数与正在生成的素数分开:
def primes(): Stream[Int] = {
def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = {
def next = a.head min b.head
Stream.cons(next, merge(if (a.head == next) a.tail else a,
if (b.head == next) b.tail else b))
}
def test(n: Int, q: Int,
compositeStream: Stream[Int],
primesStream: Stream[Int]): Stream[Int] = {
if (n == q) test(n+2, primesStream.tail.head*primesStream.tail.head,
merge(compositeStream,
Stream.from(q, 2*primesStream.head).tail),
primesStream.tail)
else if (n == compositeStream.head) test(n+2, q, compositeStream.tail,
primesStream)
else Stream.cons(n, test(n+2, q, compositeStream, primesStream))
}
Stream.cons(2, Stream.cons(3, Stream.cons(5,
test(7, 25, Stream.from(9, 6), primes().tail.tail))))
}
Run Code Online (Sandbox Code Playgroud)
作为额外的奖励,没有必要将素数的正方形存储为Longs.这也将更快,并且具有更好的算法复杂性(时间和空间),因为它避免了做大量多余的工作.Ideone测试示出了它运行在约〜Ñ 1.5..1.6 生长的经验订单中产生高达N = 80000个素数.
这里仍然存在一个算法问题:这里创建的结构仍然是一个线性的左倾结构(((mults_of_2 + mults_of_3) + mults_of_5) + ...),其中更频繁产生的流位于其内部更深处(因此数字有更多的层次可以渗透,上升).右倾结构应该更好,mults_of_2 + (mults_of_3 + (mults_of_5 + ...)).使其成为一棵树应该带来的时间复杂度的真正改善(向下推典型为至约〜ñ 1.2..1.25).有关相关讨论,请参阅此haskellwiki页面.
埃拉托塞尼的"真实"势在必行筛通常在周围运行〜Ñ 1.1(在Ñ产生素数),并在〜最佳试除法筛Ñ 1.40..1.45.您的原始代码大约在立方时间运行,或者更糟.使用命令式可变阵列通常是最快的,按段(也就是Eratosthenes的分段筛)工作.
在第二个代码的上下文中,这就是它在Python中的实现方式.
有没有一种直接的方法将带有流的代码转换为带有迭代器的代码?或者是否有一种简单的方法可以让我的第一次尝试更有效率?
@Will Ness使用Streams为您提供了一个改进的答案,并给出了为什么您的代码在早期添加流和左倾线性结构时占用了大量内存和时间的原因,但没有人完全回答第二个(或者可能是主要的)关于Eratosthenes的真正增量筛选可以用Iterator实现的部分问题.
首先,我们应该恰当地归功于这种右倾算法,其中您的第一个代码是粗略的(左倾)示例(因为它过早地将所有主要合成流添加到合并操作中),这是由于理查德·伯德在结语中所致的梅丽莎E.奥尼尔的增量筛的埃拉托色尼的最终文件.
第二,不,在这个算法中用Iterator替换Stream是不可能的,因为它依赖于在没有重新启动流的情况下移动流,并且虽然可以访问迭代器的头部(当前位置),但是使用下一个值(跳过头部)生成迭代的其余部分,因为流需要在内存和时间上以可怕的代价构建一个全新的迭代器.但是,我们可以使用Iterator输出素数序列的结果,以便最大限度地减少内存使用并使其易于使用迭代器或更高阶函数,如下面的代码所示.
现在Will Ness已经引导你了解将主要复合流添加到计算中直到需要它们的原则,当一个人将它们存储在诸如优先级队列或HashMap之类的结构中时甚至错过了O' Neill论文,但是对于Richard Bird算法,这不是必需的,因为在需要之前将不会访问未来的流值,因此如果Streams被正确地延迟构建(如懒惰和左倾),则不存储.实际上,该算法甚至不需要完整流的记忆和开销,因为每个复合数剔除序列仅向前移动而不参考任何过去的素数,除了需要单独的基本素数源,其可以由相同算法的递归调用.
为了便于参考,让我们列出Richard Bird算法的Haskell代码,如下所示:
primes = 2:([3..] ‘minus‘ composites)
where
composites = union [multiples p | p <? primes]
multiples n = map (n*) [n..]
(x:xs) ‘minus‘ (y:ys)
| x < y = x:(xs ‘minus‘ (y:ys))
| x == y = xs ‘minus‘ ys
| x > y = (x:xs) ‘minus‘ ys
union = foldr merge []
where
merge (x:xs) ys = x:merge’ xs ys
merge’ (x:xs) (y:ys)
| x < y = x:merge’ xs (y:ys)
| x == y = x:merge’ xs ys
| x > y = y:merge’ (x:xs) ys
Run Code Online (Sandbox Code Playgroud)
在下面的代码中,我简化了'减'函数(称为"minusStrtAt"),因为我们不需要构建一个全新的流,但可以将复合减法运算与原始代码相结合(仅在我的情况下为几率)序列.我还简化了"联合"功能(将其重命名为"mrgMltpls")
流操作作为非记忆通用Co归纳流(CIS)实现为泛型类,其中类的第一个字段是流的当前位置的值,第二个字段是thunk(返回零参数函数)通过嵌入式闭包参数到另一个函数的流的下一个值).
def primes(): Iterator[Long] = {
// generic class as a Co Inductive Stream element
class CIS[A](val v: A, val cont: () => CIS[A])
def mltpls(p: Long): CIS[Long] = {
var px2 = p * 2
def nxtmltpl(cmpst: Long): CIS[Long] =
new CIS(cmpst, () => nxtmltpl(cmpst + px2))
nxtmltpl(p * p)
}
def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
else new CIS(b.v, () => merge(a.cont(), b.cont()))
def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(mlps.cont())))
def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
else minusStrtAt(n + 2, cmpsts.cont())
// the following are recursive, where cmpsts uses oddPrms and
// oddPrms uses a delayed version of cmpsts in order to avoid a race
// as oddPrms will already have a first value when cmpsts is called to generate the second
def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
Iterator.iterate(new CIS(2L, () => oddPrms()))
{(cis: CIS[Long]) => cis.cont()}
.map {(cis: CIS[Long]) => cis.v}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码生成关于第十万素(1299709)ideone与约0.36第二塔顶约1.3秒,并具有大约1.43 600000个素数的经验计算复杂度.内存使用可以忽略不计,高于程序代码使用的内存.
上面的代码可以使用内置的Scala Streams实现,但是该算法不需要性能和内存使用开销(常量因子).使用Streams意味着可以直接使用它们而无需额外的Iterator生成代码,但由于这仅用于序列的最终输出,因此不会花费太多.
要按照Will Ness的建议实现一些基本的树折叠,只需添加一个"对"函数并将其挂钩到"mrgMltpls"函数中:
def primes(): Iterator[Long] = {
// generic class as a Co Inductive Stream element
class CIS[A](val v: A, val cont: () => CIS[A])
def mltpls(p: Long): CIS[Long] = {
var px2 = p * 2
def nxtmltpl(cmpst: Long): CIS[Long] =
new CIS(cmpst, () => nxtmltpl(cmpst + px2))
nxtmltpl(p * p)
}
def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
else new CIS(b.v, () => merge(a.cont(), b.cont()))
def pairs(mltplss: CIS[CIS[Long]]): CIS[CIS[Long]] = {
val tl = mltplss.cont()
new CIS(merge(mltplss.v, tl.v), () => pairs(tl.cont()))
}
def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(pairs(mlps.cont()))))
def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
else minusStrtAt(n + 2, cmpsts.cont())
// the following are recursive, where cmpsts uses oddPrms and
// oddPrms uses a delayed version of cmpsts in order to avoid a race
// as oddPrms will already have a first value when cmpsts is called to generate the second
def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
Iterator.iterate(new CIS(2L, () => oddPrms()))
{(cis: CIS[Long]) => cis.cont()}
.map {(cis: CIS[Long]) => cis.v}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码生成关于第十万素(1299709)ideone在约0.75秒约0.37第二塔顶并具有约1.09的第一百万素(15485863)(5.13秒)的经验计算复杂度.内存使用可以忽略不计,高于程序代码使用的内存.
请注意,上面的代码完全是功能性的,因为没有任何可变状态,但Bird算法(甚至树折叠版本)没有使用优先级队列或HashMap更大的范围作为数量处理树合并的操作比优先级队列的log n开销或HashMap的线性(摊销)性能具有更高的计算复杂度(尽管处理散列有很大的常数因子开销,因此优势并非如此看到直到使用一些真正大的范围).
这些代码使用如此少的内存的原因是CIS流被公式化而没有永久引用流的开始,因此在使用流时垃圾被收集,只留下最少数量的基本素数复合序列占位符Will Ness解释的非常小 - 只有546个基本素数复合数据流用于生成高达15485863的第一个百万个素数,每个占位符只占用几个10字节(8个用于长数,8个用于64位)函数引用,另外两个字节用于指向闭包参数的指针,另外几个字节用于函数和类开销,每个流的占位符总数可能为40个字节,或总共不超过20千字节用于生成一百万个素数的序列).
| 归档时间: |
|
| 查看次数: |
1287 次 |
| 最近记录: |