背景
正如在这个问题中所指出的,我正在使用Scalaz 7迭代器来处理恒定堆空间中的大型(即无界)数据流.
我的代码看起来像这样:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
Run Code Online (Sandbox Code Playgroud)
问题
我似乎遇到了内存泄漏,但我对Scalaz/FP不太熟悉,不知道这个bug是在Scalaz中还是在我的代码中.直观地说,我希望这段代码只需要(大约为)P倍的Chunk-size空间.
注:我发现了一个类似的问题,其中OutOfMemoryError遇到,但我的代码没有使用consume.
测试
我跑了一些测试试图找出问题.总而言之,只有在使用zipWithIndex和泄漏时才会出现泄漏group.
// no zipping/grouping …Run Code Online (Sandbox Code Playgroud) 考虑这段代码(从这里获取并修改为使用字节而不是字符行).
import java.io.{ File, InputStream, BufferedInputStream, FileInputStream }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
import std.list._
object IterateeIOExample {
type ErrorOr[+A] = EitherT[IO, Throwable, A]
def openStream(f: File) = IO(new BufferedInputStream(new FileInputStream(f)))
def readByte(s: InputStream) = IO(Some(s.read()).filter(_ != -1))
def closeStream(s: InputStream) = IO(s.close())
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
EitherT(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
}
def enumBuffered(r: => BufferedInputStream) = new EnumeratorT[Int, ErrorOr] {
lazy val reader = r
def apply[A] …Run Code Online (Sandbox Code Playgroud)