使用Play2/Scala通过Iteratee将文件上载流转发到S3

Seb*_*ber 18 scala amazon-s3 iterate playframework-2.0

我已经阅读了一些关于通过Iteratee将文件发送到S3的可能性,这似乎允许在我们收到文件时发送一个文件的S3块,并避免出现大文件的OutOfMemory例如.

我发现这个SO帖子可能几乎是我需要做的事情: 播放2.x:使用Iteratees上传活动文件 我真的不明白该怎么做,或者如果它真的可以在Play 2.0.2中使用(因为Sadek Brodi说foldM仅在Play 2.1中提供,例如

有人可以通过简单的方式解释这一点,对于那些阅读过有关Iteratees的博客,还不是Scala/Play2专家的人来说?

我甚至不知道我是否应该使用多部分体分析器或类似的东西,但我知道的一件事是我不明白这段代码在做什么:

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1028*1028) &>> Iteratee.consume()

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
Run Code Online (Sandbox Code Playgroud)

顺便说一句,与使用经典Java InputStream/OutputStream相比,内存消耗会有什么不同.我实际上能够以非阻塞方式将500mb文件转发到S3,内存消耗非常低,不使用Iteratees,使用Java + AsyncHttpClient + Grizzly(但我想它也适用于Netty).

那么使用Iteratee有什么好处?

我可以看到的一个区别是我获取并转发到S3的InputStream在我的情况下由临时文件支持(这是一个CXF行为),因此它可能不像Play Iteratee那样具有反应性

但是对于Iteratees,如果Enumerator产生连接接收到的字节并通过Iteratee将它们转发给S3,那么如果与S3的连接不好并且字节无法快速转发,则存储"待定"字节?

Ric*_*rty 5

简单的解释?我会尽力.:)

您正在使用组件构建管道.一旦构建了管道,就可以发送数据.它是一个Iteratee,所以它知道如何迭代数据.

您要上传的文件包含在请求正文中,BodyParser处理Play中的请求正文.因此,您将iteratee管道放入BodyParser中.当请求发出时,您的管道将被发送数据(它将迭代它).

您的管道(rechunkAdapter &>> writeToStore)将数据分块为1MB位然后将它们发送到S3.

管道的第一部分(rechunkAdapter)进行分块.它实际上有自己的迷你管道来执行分块(consumeAMB).一旦迷你管道收到足够的数据来制作一个块,它就会将它发送出主管道.

管道(writeToStore)的第二部分就像是在每个块上调用的循环,让您有机会将每个块发送到S3.

迭代的优点?

一旦你知道发生了什么,你就可以通过将组件插在一起来构建迭代管道.类型检查器通常会在您错误地插入某些内容时告诉您.

例如,我们可以修改上面的管道来修复它很慢的事实.它可能很慢,因为每当一个块准备好上传到S3时,请求上传就会暂停.减慢请求上传速度非常重要,这样我们就不会耗尽内存,但通过添加固定大小的缓冲区,我们可以更加宽容.因此,只需添加Concurrent.buffer(2)到管道中间即可缓冲最多2个块.

Iteratees为流提供了一种功能性方法.这是一个优点或缺点,取决于您对函数式编程的感觉.:)与懒惰流(另一种功能方法)相比,迭代提供对资源使用的精确控制.

最后,迭代允许我们相对简单地执行非常复杂的异步流编程(!).我们可以在不保存线程的情况下处理IO,这对可扩展性来说是一个巨大的胜利 经典的Java InputStream/OutputStream示例需要2个线程.