相关疑难解决方法(0)

使用Scalaz 7 zipWithIndex/group枚举避免内存泄漏

背景

正如在这个问题中所指出的,我正在使用Scalaz 7迭代器来处理恒定堆空间中的大型(即无界)数据流.

我的代码看起来像这样:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))
Run Code Online (Sandbox Code Playgroud)

问题

我似乎遇到了内存泄漏,但我对Scalaz/FP不太熟悉,不知道这个bug是在Scalaz中还是在我的代码中.直观地说,我希望这段代码只需要(大约为)P倍的Chunk-size空间.

注:我发现了一个类似的问题,其中OutOfMemoryError遇到,但我的代码没有使用consume.

测试

我跑了一些测试试图找出问题.总而言之,只有在使用zipWithIndex和泄漏时才会出现泄漏group.

// no zipping/grouping …
Run Code Online (Sandbox Code Playgroud)

scala scalaz iterate

106
推荐指数
1
解决办法
3354
查看次数

如何在没有溢出堆栈的情况下使用带有Scalaz7 Iterate的IO?

考虑这段代码(从这里获取并修改为使用字节而不是字符行).

import java.io.{ File, InputStream, BufferedInputStream, FileInputStream }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
import std.list._

object IterateeIOExample {
  type ErrorOr[+A] = EitherT[IO, Throwable, A]

  def openStream(f: File) = IO(new BufferedInputStream(new FileInputStream(f)))
  def readByte(s: InputStream) = IO(Some(s.read()).filter(_ != -1))
  def closeStream(s: InputStream) = IO(s.close())

  def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
    EitherT(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
  }

  def enumBuffered(r: => BufferedInputStream) = new EnumeratorT[Int, ErrorOr] {
    lazy val reader = r
    def apply[A] …
Run Code Online (Sandbox Code Playgroud)

scala scalaz iterate scalaz7

11
推荐指数
1
解决办法
582
查看次数

标签 统计

iterate ×2

scala ×2

scalaz ×2

scalaz7 ×1