Akka Streams中“平衡”和“广播”之间的区别

pac*_*man 3 scala akka akka-stream

我对中的扇出策略有些困惑Akka streams,我读到 Broadcast–给定一个输入元素发射到每个输出,Balance(1个输入,N个输出),而给定一个输入元素发射到其输出之一的(1个输入,N个输出)端口。

你能解释一下我吗:

  1. 如何平衡多个消费者?
  2. 短语“输出到其输出端口之一”的含义
  3. 下游端口是否相同?
  4. “平衡”代表输入流复制到几个输出分区中吗?
  5. “平衡使图表可以拆分,并复制下游订户的多个实例以处理该卷”是什么意思?

Bri*_*ton 5

从文档...广播将元素发送(发送)给每个消费者。余额仅向第一个可用的消费者发出。

广播

发射每个传入元素n个输出中的每个。

平衡

将流散开为几个流。每个上游元素都被发送到第一个可用的下游使用者。

编辑评论:

从主旨上讲,您应该制作两个averageCarrierDelay函数,每个ZF。然后,您可以看到发送给每个元素的所有元素。

val averageCarrierDelayZ =
    Flow[FlightDelayRecord]
      .groupBy(30, _.uniqueCarrier)
        .fold(("", 0, 0)){
          (x: (String, Int, Int), y:FlightDelayRecord) => {
            println(s"Z Received Element: ${y}")
            val count = x._2 + 1
            val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
            (y.uniqueCarrier, count, totalMins)
          }
        }.mergeSubstreams


val averageCarrierDelayF =
    Flow[FlightDelayRecord]
      .groupBy(30, _.uniqueCarrier)
        .fold(("", 0, 0)){
          (x: (String, Int, Int), y:FlightDelayRecord) => {
            println(s"F Received Element: ${y}")
            val count = x._2 + 1
            val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
            (y.uniqueCarrier, count, totalMins)
          }
        }.mergeSubstreams
Run Code Online (Sandbox Code Playgroud)

编辑2:为将来检查事情,我建议为流阶段使用通用记录器,以便您了解发生了什么。

def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
Run Code Online (Sandbox Code Playgroud)

这样做可以使您执行以下操作:

D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
Run Code Online (Sandbox Code Playgroud)

通过这种方式,您可以检查图形区域中可能有或没有预期的奇怪行为。