Scala中的Iteratees使用惰性求值或融合?

Rob*_*een 14 scala scalaz iterate

我听说迭代是懒惰的,但他们究竟有多懒?或者,迭代可以与后处理函数融合,以便不必构建中间数据结构吗?

我可以在我的iteratee中例如Stream[Option[String]] 从ajava.io.BufferedReader构建一个100万个项目,然后None以组合方式过滤掉s,而不需要将整个Stream保存在内存中吗?同时保证我不会砸堆栈?或类似的东西 - 它不必使用Stream.

我目前正在使用Scalaz 6,但如果其他迭代实现能够以更好的方式执行此操作,我有兴趣知道.

请提供完整的解决方案,包括关闭BufferedReader和致电unsafePerformIO(如果适用).

Tra*_*own 12

下面是一个使用Scalaz 7库的快速迭代示例,该库演示了您感兴趣的属性:常量内存和堆栈使用情况.

问题

首先假设我们在每一行都有一个带有十进制数字字符串的大文本文件,我们希望找到包含至少二十个零的所有行.我们可以生成一些这样的样本数据:

val w = new java.io.PrintWriter("numbers.txt")
val r = new scala.util.Random(0)

(1 to 1000000).foreach(_ =>
  w.println((1 to 100).map(_ => r.nextInt(10)).mkString)
)

w.close()
Run Code Online (Sandbox Code Playgroud)

现在我们有一个名为的文件numbers.txt.让我们打开它BufferedReader:

val reader = new java.io.BufferedReader(new java.io.FileReader("numbers.txt"))
Run Code Online (Sandbox Code Playgroud)

它不是太大(~97兆字节),但它足够大,我们可以很容易地看到我们的内存使用是否实际上在我们处理时保持不变.

设置我们的枚举器

首先是一些进口:

import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I }
Run Code Online (Sandbox Code Playgroud)

和一个普查员(注意我为了方便起见将IoExceptionOrs改为Options):

val enum = I.enumReader(reader).map(_.toOption)
Run Code Online (Sandbox Code Playgroud)

Scalaz 7目前没有提供枚举文件行的好方法,因此我们一次将一个字符分块.这当然会非常缓慢,但我不会在这里担心,因为这个演示的目标是表明我们可以在恒定的内存中处理这个大型文件,而不会破坏堆栈.这个答案的最后一部分提供了一种具有更好性能的方法,但在这里我们只是分开换行符:

val split = I.splitOn[Option[Char], List, IO](_.cata(_ != '\n', false))
Run Code Online (Sandbox Code Playgroud)

如果splitOn采用指定分割的谓词的事实会让你感到困惑,那么你并不孤单.split是我们的第一个枚举示例.我们将继续将枚举器包装在其中:

val lines = split.run(enum).map(_.sequence.map(_.mkString))
Run Code Online (Sandbox Code Playgroud)

现在我们Option[String]IOmonad中有一个s 的枚举器.

使用枚举过滤文件

接下来我们的谓词 - 记得我们说过我们想要至少有20个零的行:

val pred = (_: String).count(_ == '0') >= 20
Run Code Online (Sandbox Code Playgroud)

我们可以把它变成一个过滤枚举器并将我们的枚举器包装在:

val filtered = I.filter[Option[String], IO](_.cata(pred, true)).run(lines)
Run Code Online (Sandbox Code Playgroud)

我们将设置一个简单的动作,只打印通过此过滤器的所有内容:

val printAction = (I.putStrTo[Option[String]](System.out) &= filtered).run
Run Code Online (Sandbox Code Playgroud)

当然我们还没有读过任何东西.为此,我们使用unsafePerformIO:

printAction.unsafePerformIO()
Run Code Online (Sandbox Code Playgroud)

现在我们可以看到Some("0946943140969200621607610...")s慢慢滚动,同时我们的内存使用量保持不变.它很慢,错误处理和输出有点笨重,但我认为大约九行代码也不错.

从迭代中获取输出

那是foreach- 用法.我们还可以创建一个更像折叠的迭代 - 例如,收集通过过滤器并将它们返回到列表中的元素.只需重复上面的所有内容直到printAction定义,然后写下来:

val gatherAction = (I.consume[Option[String], IO, List] &= filtered).run
Run Code Online (Sandbox Code Playgroud)

踢掉那个动作:

val xs: Option[List[String]] = gatherAction.unsafePerformIO().sequence
Run Code Online (Sandbox Code Playgroud)

现在去喝咖啡(它可能需要很远).当你回来时,你要么None(或者在IOException某个地方的某个地方)或者Some包含1,943个字符串的列表.

完整(更快)的示例,自动关闭文件

要回答关于关闭阅读器的问题,这里有一个完整的工作示例,大致相当于上面的第二个程序,但有一个枚举器负责打开和关闭阅读器.它也快得多,因为它读取的是行,而不是字符.首先是进口和一些辅助方法:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, Either[Throwable, B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

def enumBuffered(r: => BufferedReader) =
  new EnumeratorT[Either[Throwable, String], IO] {
    lazy val reader = r
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(reader.readLine())).flatMap {
          case Right(null) => s.pointI
          case Right(line) => k(I.elInput(Right(line))) >>== apply[A]
          case e => k(I.elInput(e))
        }
    )
  }
Run Code Online (Sandbox Code Playgroud)

而现在的调查员:

def enumFile(f: File): EnumeratorT[Either[Throwable, String], IO] =
  new EnumeratorT[Either[Throwable, String], IO] {
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(new BufferedReader(new FileReader(f)))).flatMap {
          case Right(reader) => I.iterateeT(
            enumBuffered(reader).apply(s).value.ensuring(IO(reader.close()))
          )
          case Left(e) => k(I.elInput(Left(e)))
        }
      )
  }
Run Code Online (Sandbox Code Playgroud)

我们准备好了:

val action = (
  I.consume[Either[Throwable, String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 20)) &=
  enumFile(new File("numbers.txt"))
).run
Run Code Online (Sandbox Code Playgroud)

现在,处理完成后,读者将被关闭.