如何将Source [ByteString,Any]转换为InputStream

kos*_*tya 30 scala akka akka-stream

akka-http表示使用multipart/form-data编码上传的文件Source[ByteString, Any].我需要使用期望的Java库解组它InputStream.

怎么Source[ByteString, Any]可以变成一个InputStream

Ben*_*ger 23

从版本2.x开始,您可以使用以下代码实现此目的:

import akka.stream.scaladsl.StreamConverters
...
val inputStream: InputStream = entity.dataBytes
        .runWith(
           StreamConverters.asInputStream(FiniteDuration(3, TimeUnit.SECONDS))
        )
Run Code Online (Sandbox Code Playgroud)

请参阅:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.1/scala/migration-guide-1.0-2.x-scala.html

注意:在版本2.0.2中已中断,在2.4.2中已修复

  • 在这种情况下,@ lisak可以执行data.runFold(ByteString.empty)(_ ++ _)。toArray,其中数据的类型为Source [ByteString,Any] (2认同)
  • 它会在物化时将整个数据集加载到内存中吗? (2认同)

cmb*_*ter 7

您可以尝试使用OutputStreamSink写入a PipedOutputStream并将其反馈到PipedInputStream您的其他代码用作其输入流的那个.这是一个有点粗略的想法,但它可以工作.代码如下所示:

import akka.util.ByteString
import akka.stream.scaladsl.Source
import java.io.PipedInputStream
import java.io.PipedOutputStream
import akka.stream.io.OutputStreamSink
import java.io.BufferedReader
import java.io.InputStreamReader
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer

object PipedStream extends App{
  implicit val system = ActorSystem("flowtest")
  implicit val mater = ActorFlowMaterializer()

  val lines = for(i <- 1 to 100) yield ByteString(s"This is line $i\n")
  val source = Source(lines)

  val pipedIn = new PipedInputStream()
  val pipedOut = new PipedOutputStream(pipedIn)      
  val flow = source.to(OutputStreamSink(() => pipedOut))
  flow.run()

  val reader = new BufferedReader(new InputStreamReader(pipedIn))
  var line:String = reader.readLine
  while(line != null){
    println(s"Reader received line: $line")
    line = reader.readLine
  }           
}
Run Code Online (Sandbox Code Playgroud)

  • 我说"不太可能"因为它设置得当它不会发生.但是,例如,假设您正在测试,并且您正在使用CallingThreadDispatcher进行所有操作,那么它将陷入僵局. (2认同)