在scala中使用akka-http流式传输CSV

oge*_*gen 1 csv scala akka akka-http

我是akka-http的新手,我想用任意数量的行来传输一个csv.

例如,我想回复:

a,1
b,2
c,3
Run Code Online (Sandbox Code Playgroud)

使用以下代码

implicit val actorSystem = ActorSystem("system")
implicit val actorMaterializer = ActorMaterializer()

val map = new mutable.HashMap[String, Int]()
map.put("a", 1)
map.put("b", 2)
map.put("c", 3)
val `text/csv` = ContentType(MediaTypes.`text/csv`, `UTF-8`)
val route =
  path("test") {
    complete {
      HttpEntity(`text/csv`, ??? using map)
    }
  }
Http().bindAndHandle(route,"localhost",8080)
Run Code Online (Sandbox Code Playgroud)

谢谢你的帮助

编辑:感谢Ramon J Romero和Vigil

package test


import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpCharsets.`UTF-8`
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream._
import akka.util.ByteString

import scala.collection.mutable

object Test{

  def main(args: Array[String]) {

    implicit val actorSystem = ActorSystem("system")
    implicit val actorMaterializer = ActorMaterializer()

    val map = new mutable.HashMap[String, Int]()
    map.put("a", 1)
    map.put("b", 2)
    map.put("c", 3)

    val mapStream = Stream.fromIterator(() => map.toIterator)
      .map((k: String, v: Int) => s"$k,$v")
      .map(ByteString.apply)
    val `text/csv` = ContentType(MediaTypes.`text/csv`, `UTF-8`)
    val route =
      path("test") {
        complete {
          HttpEntity(`text/csv`, mapStream)
        }
      }
    Http().bindAndHandle(route, "localhost", 8080)

  }
}
Run Code Online (Sandbox Code Playgroud)

使用此代码我有两个编译错误:

Error:(29, 28) value fromIterator is not a member of object scala.collection.immutable.Stream
val mapStream = Stream.fromIterator(() => map.toIterator)

Error:(38, 11) overloaded method value apply with alternatives:
  (contentType: akka.http.scaladsl.model.ContentType,file: java.io.File,chunkSize: Int)akka.http.scaladsl.model.UniversalEntity <and>
  (contentType: akka.http.scaladsl.model.ContentType,data: akka.stream.scaladsl.Source[akka.util.ByteString,Any])akka.http.scaladsl.model.HttpEntity.Chunked <and>
  (contentType: akka.http.scaladsl.model.ContentType,data: akka.util.ByteString)akka.http.scaladsl.model.HttpEntity.Strict <and>
  (contentType: akka.http.scaladsl.model.ContentType,bytes: Array[Byte])akka.http.scaladsl.model.HttpEntity.Strict <and>
  (contentType: akka.http.scaladsl.model.ContentType.NonBinary,string: String)akka.http.scaladsl.model.HttpEntity.Strict
 cannot be applied to (akka.http.scaladsl.model.ContentType.WithCharset, List[akka.util.ByteString])
          HttpEntity(`text/csv`, mapStream)
Run Code Online (Sandbox Code Playgroud)

我使用了一个元组列表来解决第一个问题(我不知道如何在Scala中传输地图)不知道第二个问题感谢您的帮助.

(我使用的是scala 2.11.8)

Ram*_*gil 5

使用apply的功能HttpEntity,在一个需要Source[ByteString,Any].apply会创建一个Chunked实体.您可以使用代码基于使用akka流的流文件IO 的文档来读取您的文件Source:

import akka.stream.scaladsl._

val file = Paths.get("yourFile.csv")

val entity = HttpEntity(`txt/csv`, FileIO.fromPath(file))
Run Code Online (Sandbox Code Playgroud)

流将您的文件分解为块大小,默认当前设置为8192.

要流式传输您创建的地图,您可以使用类似的技巧:

val mapStream = Source.fromIterator(() => map.toIterator)
                      .map( (k : String, v : Int) => s"$k,$v" )
                      .map(ByteString.apply)

val mapEntity = HttpEntity(`test/csv`, mapStream)
Run Code Online (Sandbox Code Playgroud)