特别处理Akka流的第一个元素

pov*_*der 8 scala akka akka-stream

是否有一种Source以特殊方式处理Akka流的第一个元素的惯用方法?我现在拥有的是:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }
Run Code Online (Sandbox Code Playgroud)

谢谢

Mik*_*ame 8

虽然我通常会使用Ramon的答案,但您也可以使用prefixAndTail前缀为1,flatMapConcat以实现类似的功能:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5
Run Code Online (Sandbox Code Playgroud)

这当然不仅适用于第一个项目,而且适用于前N个项目,条件是这些项目将被视为严格的集合.