ste*_*bot 9 scala akka-stream akka-http
在过去的几天里,我一直试图找出使用Akka Streams和HTTP将HTTP资源下载到文件的最佳方法.
最初我开始使用Future-Based Variant,它看起来像这样:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
Run Code Online (Sandbox Code Playgroud)
这有点好,但是一旦我学到了更多关于纯Akka Streams的信息,我想尝试使用基于Flow的Variant来创建一个从一个开始的流Source[HttpRequest].起初,这完全困扰了我,直到我偶然发现了flatMapConcat流动转换.这最终变得更加冗长:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
Run Code Online (Sandbox Code Playgroud)
然后我想要有点棘手并使用Content-Disposition标题.
回到基于未来的变体:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
Run Code Online (Sandbox Code Playgroud)
但现在我不知道如何使用Future-Based Variant来做到这一点.这是我得到的:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
Run Code Online (Sandbox Code Playgroud)
所以现在我有一个Source会(ByteString, File)为每个发出一个或多个元素File(我说每个File因为没有理由原来Source必须是一个HttpRequest).
无论如何都要采取这些并将它们转变为动态Sink?
我在想类似的东西flatMapConcat,例如:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
Run Code Online (Sandbox Code Playgroud)
所以我可以完成downloadViaFlow2:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
Run Code Online (Sandbox Code Playgroud)
该解决方案不需要flatMapConcat.如果您不需要文件写入的任何返回值,那么您可以使用Sink.foreach:
def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
val file = destinationFile(downloadDir, httpResponse)
httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
val request = HttpRequest(uri=uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.runWith(Sink.foreach(writeFile(downloadDir)))
}
Run Code Online (Sandbox Code Playgroud)
请注意,从函数Sink.foreach创建.因此,没有太大的背压.writeFile可能会被硬盘驱动器放慢速度,但是流会继续生成Futures.要控制它,您可以使用(或):FutureswriteFileFlow.mapAsyncUnorderedFlow.mapAsync
val parallelism = 10
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.ignore)
Run Code Online (Sandbox Code Playgroud)
如果要累计总计数的长值,则需要与Sink.fold:
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadDir))
.runWith(Sink.fold(0L)(_ + _))
Run Code Online (Sandbox Code Playgroud)
折叠将保持运行总和,并在请求源干涸时发出最终值.