Akka-Streams收集数据(来源 - >流量 - >流量(收集) - >接收器)

Cem*_*ser 3 scala akka-stream

我是斯卡拉和阿卡的全新人物.我有一个简单的RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach
Run Code Online (Sandbox Code Playgroud)

现在我想要这样的东西:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
Run Code Online (Sandbox Code Playgroud)

但Flow2应该等到Flow1中的100个元素可用,然后将这100个元素转换为新元素(需要Flow1中的所有100个元素)并将此新元素提供给Sink.

我做了一些研究并找到了明确的用户定义缓冲区,但我不明白如何从flow2中的flow1访问所有100个元素并使用它们进行一些转换.有人可以解释一下吗?或者甚至更好地发布一个简单的小例子?或两者?

Ram*_*gil 9

Akka Defined Collection

如果您不介意使用akka确定的集合类型,那么您可以使用该grouped函数:

//alternative stream formation
val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
                             .runWith(Sink foreach println)
Run Code Online (Sandbox Code Playgroud)

用户定义的集合

如果要控制用于缓冲区的集合类型,例如a SeqArray:

type MyCollectionType[X] = Array[X]

def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
Run Code Online (Sandbox Code Playgroud)

然后,您可以使用两个Flow执行此操作.第一个Flow执行a scan来构建一系列元素:

val bufferSize = 10

def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] = 
  (if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i

val buffer : Flow[Int, MyCollectionType[Int], _] = 
  Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
    (coll, i) => appendToMyCollection(coll, i)
  }
Run Code Online (Sandbox Code Playgroud)

第二个Flow是一个filter具有恰当大小的序列(即"goldiLocks"):

val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
  Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
Run Code Online (Sandbox Code Playgroud)

这两个Flow可以组合在一起生成一个Stream,它将生成所需的集合类型:

val stream = Source(1 to 100).via(buffer)
                             .via(goldiLocks)
                             .runWith(Sink foreach println)
Run Code Online (Sandbox Code Playgroud)

  • `Flow [Int] .grouped(bufferSize)`更简单. (3认同)