如何使用Akka HTTP通过输出流生成内容

dav*_*rez 9 scala akka akka-stream akka-http

我是Akka Streams和Akka HTTP的新手.

我想生成一个简单的HTTP服务器,它可以从文件夹的内容生成一个zip文件并将其发送到客户端.

org.zeroturnaround.zip.ZipUtil使创建zip文件的任务变得非常简单,但它需要一个outputStream.

这是我的解决方案(用Scala语言编写):

            val os = new ByteArrayOutputStream()
            ZipUtil.pack(myFolder, os)
            HttpResponse(entity = HttpEntity(
                MediaTypes.`application/zip`,
                os.toByteArray))
Run Code Online (Sandbox Code Playgroud)

此解决方案有效,但将所有内容保留在内存中,因此无法扩展.

我认为解决这个问题的关键是使用这个:

val source = StreamConverters.asOutputStream()
Run Code Online (Sandbox Code Playgroud)

但不知道如何使用它.:-(

有什么帮助吗?

Rüd*_*ehn 11

试试这个

val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream()
  .mapMaterializedValue(os => ZipUtil.pack(myFolder, os))
HttpResponse(entity = HttpEntity(
            MediaTypes.`application/zip`,
            byteSource))
Run Code Online (Sandbox Code Playgroud)

只有在源实现后才能访问OutputStream,这可能不会立即发生.从理论上讲,源也可以多​​次实现,所以你应该能够处理这个问题.

  • 我希望我之前知道:) (2认同)

exp*_*ert 3

我有同样的问题。为了使其与背压兼容,我必须编写人工,然后InputStream将其转换为Sourcevia StreamConverters.fromInputStream(() => input),然后您从 Akka-Http DSLcomplete指令返回。

这是我写的。

import java.io.{File, IOException, InputStream}
import java.nio.charset.StandardCharsets
import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile}

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

class DownloadStatsZipReader(path: String, password: String) extends InputStream {

  private val (archive, targetDate) = {
    val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName()))

    @tailrec
    def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] =
      Option(inputFile.getNextEntry) match {
        case Some(entry) =>
          if (!entry.isDirectory) {
            val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)")
            if (parts(1) == "tab" && entry.getSize > 0)
              Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match {
                case Success(localDate) =>
                  Some(localDate -> entry)
                case Failure(_) =>
                  findValidEntry()
              }
            else
              findValidEntry()
          } else
            findValidEntry()
        case None => None
      }

    val (date, _) = findValidEntry().getOrElse {
      throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`")
    }
    inputFile -> date
  }

  private val buffer = new Array[Byte](1024)
  private var offsetBuffer: Int = 0
  private var sizeBuffer: Int = 0

  def getTargetDate: LocalDate = targetDate

  override def read(): Int =
    sizeBuffer match {
      case -1 =>
        -1
      case 0 =>
        loadNextChunk()
        read()
      case _ =>
        if (offsetBuffer < sizeBuffer) {
          val result = buffer(offsetBuffer)
          offsetBuffer += 1
          result
        } else {
          sizeBuffer = 0
          read()
        }
    }

  @throws[IOException]
  override def close(): Unit = {
    archive.close()
  }

  private def loadNextChunk(): Unit = try {
    val bytesRead = archive.read(buffer)
    if (bytesRead >= 0) {
      offsetBuffer = 0
      sizeBuffer = bytesRead
    } else {
      offsetBuffer = -1
      sizeBuffer = -1
    }
  } catch {
    case ex: Throwable =>
      ex.printStackTrace()
      throw ex
  }
}
Run Code Online (Sandbox Code Playgroud)

如果您在我的代码中发现错误,请告诉我。