RJ *_*old 5 scala enumerator scalaz iterate scalaz7
我正在尝试使用scalaz iteratee包来处理恒定空间中的大型zip文件.我需要对zip文件中的每个文件执行一个长时间运行的进程.这些过程可以(并且应该)并行运行.
我创建了一个EnumeratorT将每个膨胀ZipEntry成一个File对象.签名如下:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
Run Code Online (Sandbox Code Playgroud)
我想附加一个IterateeT将在每个文件上执行长时间运行的进程.我基本上最终得到了类似的东西:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
Run Code Online (Sandbox Code Playgroud)
当我尝试运行它时:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
Run Code Online (Sandbox Code Playgroud)
我收到一条java.lang.OutOfMemoryError: Java heap space消息.这对我来说很有意义,因为它试图在所有这些IO和Promise对象的内存中建立一个庞大的列表.
几个问题:
longRunningProcess它的副作用.Enumerator方法是错误的方法吗?我几乎没有想法,所以任何事情都会有所帮助.
谢谢!
更新#1
这是堆栈跟踪:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
Run Code Online (Sandbox Code Playgroud)
我目前正在接受nadavwr的建议,以确保一切都像我认为的那样.我会报告任何更新.
更新#2
使用以下答案中的想法,我找到了一个不错的解决方案.正如huynhjl建议的那样(我使用nadavwr的分析堆转储的建议进行了验证),consume导致每个膨胀ZipEntry都被保存在内存中,这就是为什么进程内存不足的原因.我改consume到foldM和更新的长期运行过程中只返回一个Promise[IOE[Unit]]而不是到文件的引用.这样,我最后收集了所有IoExceptions.这是工作解决方案:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
Run Code Online (Sandbox Code Playgroud)
此解决方案会在异步上载每个条目时对其进行扩展.最后,我有一个Promise包含任何错误的大量已完成对象.我仍然不完全相信这是对Iteratee的正确使用,但我现在有几个可重复使用的,可组合的部分,我可以在我们系统的其他部分使用(这对我们来说是一个非常常见的模式).
感谢你的帮助!
不要使用consume. 请参阅我最近的另一个答案:How to use IO with Scalaz7 Iteratees without Overflow the stack?
foldM可能是一个更好的选择。
还可以尝试将文件映射到其他内容(例如成功返回代码),以查看是否允许 JVM 对膨胀的 zip 条目进行垃圾收集。
| 归档时间: |
|
| 查看次数: |
704 次 |
| 最近记录: |