dav*_*rez 9 scala akka akka-stream akka-http
我是Akka Streams和Akka HTTP的新手.
我想生成一个简单的HTTP服务器,它可以从文件夹的内容生成一个zip文件并将其发送到客户端.
org.zeroturnaround.zip.ZipUtil使创建zip文件的任务变得非常简单,但它需要一个outputStream.
这是我的解决方案(用Scala语言编写):
val os = new ByteArrayOutputStream()
ZipUtil.pack(myFolder, os)
HttpResponse(entity = HttpEntity(
MediaTypes.`application/zip`,
os.toByteArray))
Run Code Online (Sandbox Code Playgroud)
此解决方案有效,但将所有内容保留在内存中,因此无法扩展.
我认为解决这个问题的关键是使用这个:
val source = StreamConverters.asOutputStream()
Run Code Online (Sandbox Code Playgroud)
但不知道如何使用它.:-(
有什么帮助吗?
Rüd*_*ehn 11
试试这个
val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream()
.mapMaterializedValue(os => ZipUtil.pack(myFolder, os))
HttpResponse(entity = HttpEntity(
MediaTypes.`application/zip`,
byteSource))
Run Code Online (Sandbox Code Playgroud)
只有在源实现后才能访问OutputStream,这可能不会立即发生.从理论上讲,源也可以多次实现,所以你应该能够处理这个问题.
我有同样的问题。为了使其与背压兼容,我必须编写人工,然后InputStream将其转换为Sourcevia StreamConverters.fromInputStream(() => input),然后您从 Akka-Http DSLcomplete指令返回。
这是我写的。
import java.io.{File, IOException, InputStream}
import java.nio.charset.StandardCharsets
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
class DownloadStatsZipReader(path: String, password: String) extends InputStream {
private val (archive, targetDate) = {
val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName()))
@tailrec
def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] =
Option(inputFile.getNextEntry) match {
case Some(entry) =>
if (!entry.isDirectory) {
val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)")
if (parts(1) == "tab" && entry.getSize > 0)
Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match {
case Success(localDate) =>
Some(localDate -> entry)
case Failure(_) =>
findValidEntry()
}
else
findValidEntry()
} else
findValidEntry()
case None => None
}
val (date, _) = findValidEntry().getOrElse {
throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`")
}
inputFile -> date
}
private val buffer = new Array[Byte](1024)
private var offsetBuffer: Int = 0
private var sizeBuffer: Int = 0
def getTargetDate: LocalDate = targetDate
override def read(): Int =
sizeBuffer match {
case -1 =>
-1
case 0 =>
loadNextChunk()
read()
case _ =>
if (offsetBuffer < sizeBuffer) {
val result = buffer(offsetBuffer)
offsetBuffer += 1
result
} else {
sizeBuffer = 0
read()
}
}
@throws[IOException]
override def close(): Unit = {
archive.close()
}
private def loadNextChunk(): Unit = try {
val bytesRead = archive.read(buffer)
if (bytesRead >= 0) {
offsetBuffer = 0
sizeBuffer = bytesRead
} else {
offsetBuffer = -1
sizeBuffer = -1
}
} catch {
case ex: Throwable =>
ex.printStackTrace()
throw ex
}
}
Run Code Online (Sandbox Code Playgroud)
如果您在我的代码中发现错误,请告诉我。
| 归档时间: |
|
| 查看次数: |
1548 次 |
| 最近记录: |