pac*_*man 3 scala akka akka-stream
我对中的扇出策略有些困惑Akka streams,我读到
Broadcast–给定一个输入元素发射到每个输出,Balance(1个输入,N个输出),而给定一个输入元素发射到其输出之一的(1个输入,N个输出)端口。
你能解释一下我吗:
从文档...广播将元素发送(发送)给每个消费者。余额仅向第一个可用的消费者发出。
发射每个传入元素n个输出中的每个。
将流散开为几个流。每个上游元素都被发送到第一个可用的下游使用者。
编辑评论:
从主旨上讲,您应该制作两个averageCarrierDelay函数,每个Z和F。然后,您可以看到发送给每个元素的所有元素。
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
Run Code Online (Sandbox Code Playgroud)
编辑2:为将来检查事情,我建议为流阶段使用通用记录器,以便您了解发生了什么。
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
Run Code Online (Sandbox Code Playgroud)
这样做可以使您执行以下操作:
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
Run Code Online (Sandbox Code Playgroud)
通过这种方式,您可以检查图形区域中可能有或没有预期的奇怪行为。
| 归档时间: |
|
| 查看次数: |
1762 次 |
| 最近记录: |