如何从多个文件写入组装Akka Streams接收器?

Tom*_*pey 8 scala akka akka-stream playframework-2.5

我正在尝试将基于akka流的流程集成到我的Play 2.5应用程序中.我们的想法是,您可以在照片中流式传输,然后将其作为原始文件,缩略图版本和水印版本写入磁盘.

我设法使用这样的图形来实现这一点:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
                                    .map(_.result().toArray)

def toByteArray = Flow[ByteString].map(b => b.toArray)

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
  import GraphDSL.Implicits._
  val streamFan = builder.add(Broadcast[ByteString](3))
  val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
  val output = builder.add(Flow[ByteString].map(x => Success(Done)))

  val rawFileSink = FileIO.toFile(file)
  val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
  val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

  streamFan.out(0) ~> rawFileSink
  streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
  streamFan.out(2) ~> output.in

  byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
  byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink

  FlowShape(streamFan.in, output.out)
})

graph
Run Code Online (Sandbox Code Playgroud)

}

然后我使用这样的累加器将它连接到我的播放控制器:

val sink = Sink.head[Try[Done]]

val photoStorageParser = BodyParser { req =>
     Accumulator(sink).through(graph).map(Right.apply)
}
Run Code Online (Sandbox Code Playgroud)

问题是我的两个处理过的文件接收器没有完成,我得到的两个处理文件的大小都是零,但不是原始的.我的理论是累加器只等待我的扇出的一个输出,所以当输入流完成并且我的byteAccumulator吐出完整的文件时,到处理完成时,播放已从输出获得物化值.

所以,我的问题是:
就我的方法而言,我是否在正确的轨道上?运行这样的图形的预期行为是什么?如何将所有水槽组合在一起形成一个最终水槽?

Tom*_*pey 7

好的,经过一点帮助(安德烈亚斯在正确的轨道上),我已经到达了这个解决方案,它可以解决问题:

val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) {
  implicit builder => (rawSink, thumbSink, waterSink) => {
    val streamFan = builder.add(Broadcast[ByteString](2))
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))

    streamFan.out(0) ~> rawSink
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink

    SinkShape(streamFan.in)
  }
})

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done)))
Run Code Online (Sandbox Code Playgroud)

之后很容易从Play中调用它:

val photoStorageParser = BodyParser { req =>
  Accumulator(theSink).map(Right.apply)
}

def createImage(path: String) = Action(photoStorageParser) { req =>
  Created
}
Run Code Online (Sandbox Code Playgroud)