Scala - 对有序迭代器进行惰性分组

jef*_*eon 4 iterator scala stream lazy-evaluation

我有一个Iterator[Record]record.id这种方式订购的:

record.id=1
record.id=1
...
record.id=1
record.id=2
record.id=2
..
record.id=2
Run Code Online (Sandbox Code Playgroud)

特定 ID 的记录可能会出现多次,因此我想编写一个函数,将该迭代器作为输入,并Iterator[Iterator[Record]]以惰性方式返回输出。

我能够提出以下内容,但StackOverflowError在 500K 记录左右后失败:

def groupByIter[T, B](iterO: Iterator[T])(func: T => B): Iterator[Iterator[T]] = new Iterator[Iterator[T]] {
    var iter = iterO
    def hasNext = iter.hasNext

    def next() = {
      val first = iter.next()
      val firstValue = func(first)
      val (i1, i2) = iter.span(el => func(el) == firstValue)
      iter = i2
      Iterator(first) ++ i1
    }
  }
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?

Odo*_*ois 5

这里的问题是,每次Iterator.span调用都会为trailing迭代器创建另一个堆栈闭包,并且如果没有任何蹦床,它很容易溢出。

实际上,我不认为存在一种实现,它不会记住前缀迭代器的元素,因为可以在前缀耗尽之前访问后面的迭代器。

即使在.span实现中,也需要Queue记住Leading定义中的元素。

我能想象到的最简单的实现是以下 via Stream

implicit class StreamChopOps[T](xs: Stream[T]) {
  def chopBy[U](f: T => U): Stream[Stream[T]] = xs match {
    case x #:: _ =>
      def eq(e: T) = f(e) == f(x)
      xs.takeWhile(eq) #:: xs.dropWhile(eq).chopBy(f)
    case _ => Stream.empty
  }
}
Run Code Online (Sandbox Code Playgroud)

尽管它可能不是性能最好的,因为它会记住很多东西。但通过适当的迭代,GC 应该可以处理过多中间流的问题。

你可以用它作为myIterator.toStream.chopBy(f)

简单的检查验证以下代码可以在没有 SO 的情况下运行

Iterator.fill(10000000)(Iterator(1,1,2)).flatten //1,1,2,1,1,2,...
  .toStream.chopBy(identity)                     //(1,1),(2),(1,1),(2),...
  .map(xs => xs.sum * xs.size).sum               //60000000
Run Code Online (Sandbox Code Playgroud)