akka-stream - 如何在流/图中以不同方式处理流的最后一个元素

Rag*_*Rag 7 json scala akka akka-stream

我正在尝试实现一个Akka Streams Flow,它将JSON对象流转换为单个JSON对象数组的流.我可以Concat用来添加"["之前和"]"之后,以及Zip在元素之间插入逗号,但我无法弄清楚如何不插入最后的逗号.

我到目前为止的代码是:

trait JsonStreamSupport {

  protected def toJsonArrayString[T : Writes] = Flow[T].map(Json.toJson(_)).map(_.toString()).via(jsonArrayWrapper)


  private[this] val jsonArrayWrapper: Flow[String, String, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val start = Source.single("[")
    val comma = Source.repeat(",")
    val end = Source.single("]")
    val concat = b.add(Concat[String](3))
    val zip = b.add(Zip[String,String])

    comma ~> zip.in1
    start ~> concat.in(0)
    zip.out.map({case (msg,delim) => msg + delim}) ~> concat.in(1)
    end ~> concat.in(2)
    FlowShape(zip.in0, concat.out)
  })
}
Run Code Online (Sandbox Code Playgroud)

目前输出是:
[{"key":"value},{"key","value"},]
但我需要它
[{"key":"value},{"key","value"}](没有最终的逗号),
其中数组的每个元素仍然是流的不同元素,因此可以例如分别通过分块HTTP发送.