Vuz*_*uzi 3 scala stream akka akka-stream
我正在尝试将传入的Akka字节流(从http请求的主体,但也可以从文件中)拆分为多个定义大小的文件.
例如,如果我上传一个10Gb文件,它将创建类似10Gab的1Gb文件.这些文件会随机生成名称.我的问题是我真的不知道从哪里开始,因为我读过的所有响应和示例都是将整个块存储到内存中,或者使用基于字符串的分隔符.除了我不能真正拥有1Gb的"块",然后只是将它们写入磁盘..
有没有简单的方法来执行这种操作?我唯一的想法是使用这样的东西http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings但转换成类似的东西FlowShape[ByteString, File],将自己写入文件块直到达到最大文件大小,然后创建一个新文件等,然后回传创建的文件.这看起来像一个不正确使用Akka的残暴想法..
提前致谢
我经常回归到纯粹的功能性,非akka技术,解决诸如此类的问题,然后将这些功能"提升"为akka结构.通过这个我的意思是我尝试只使用scala"stuff",然后尝试将这些东西包装在akka中...
文件创建
从FileOutputStream基于"随机生成的名称" 的创建开始:
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator : () => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
Run Code Online (Sandbox Code Playgroud)
州
需要某种方式来存储当前文件的"状态",例如已经写入的字节数:
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
Run Code Online (Sandbox Code Playgroud)
文件写作
首先,我们确定是否违反了给定的最大文件大小阈值ByteString:
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
Run Code Online (Sandbox Code Playgroud)
然后我们必须编写创建新函数的函数,FileState如果我们已经最大化当前的容量或者如果它仍然低于容量则返回当前状态:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
Run Code Online (Sandbox Code Playgroud)
唯一剩下的就是写信给FileOutputStream:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
Run Code Online (Sandbox Code Playgroud)
折叠任何GenTraversableOnce
在scala中,a GenTraversableOnce是具有fold运算符的任何集合,无论是否并行.这些包括Iterator,Vector,Array,Seq,scala stream,...最终writeToChunkedFile函数完全匹配GenTraversableOnce#fold的签名:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
Run Code Online (Sandbox Code Playgroud)
最后一个松散的结局; 最后还FileOutputStream需要关闭.由于折叠只会发出最后一个,FileState我们可以关闭那个:
closeFileInState(finalFileState)
Run Code Online (Sandbox Code Playgroud)
Akka Streams
Akka Flow fold从FlowOps#fold中获取它,恰好与GenTraversableOnce签名匹配.因此,我们可以将常规函数"提升"为与我们使用Iterablefold 的方式类似的流值:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
Run Code Online (Sandbox Code Playgroud)
使用常规函数处理问题的好处是它们可以在除流之外的其他异步框架中使用,例如Futures或Actors.你也不需要ActorSystem在单元测试中使用akka ,只需要常规的语言数据结构.
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
Run Code Online (Sandbox Code Playgroud)
然后,您可以使用此Sink排出HttpEntity来自哪里HttpRequest.