akka:用于组合来自多个孩子的消息的模式

Mul*_*efa 8 concurrency scala mapreduce akka

这是我遇到的模式:

一个演员A有多个孩子C1,...... , Cn. 收到消息后,A将其发送给每个子节点,每个子节点对消息进行一些计算,并在完成时将其发送回A.A然后,我想把所有孩子的结果结合起来传递给另一个演员.

这个问题的解决方案是什么样的?或者这是反模式?在哪种情况下应该如何处理这个问题?

这是一个简单的例子,希望能够说明我目前的解决方案.我担心的是重复的代码(直到对称); 并没有很好地延伸到"很多"的孩子; 并且很难看出发生了什么.

import akka.actor.{Props, Actor}

case class Tagged[T](value: T, id: Int)

class A extends Actor {
  import C1._
  import C2._

  val c1 = context.actorOf(Props[C1], "C1")
  val c2 = context.actorOf(Props[C2], "C2")
  var uid = 0
  var c1Results = Map[Int, Int]()
  var c2Results = Map[Int, Int]()

  def receive = {
    case n: Int => {
      c1 ! Tagged(n, uid)
      c2 ! Tagged(n, uid)
      uid += 1
    }
    case Tagged(C1Result(n), id) => c2Results get id match {
      case None => c1Results += (id -> n)
      case Some(m) => {
        c2Results -= id
        context.parent ! (n, m)
      }
    }
    case Tagged(C2Result(n), id) => c1Results get id match {
      case None => c2Results += (id -> n)
      case Some(m) => {
        c1Results -= id
        context.parent ! (m, n)
      }
    }
  }
}

class C1 extends Actor {
  import C1._

  def receive = {
    case Tagged(n: Int, id) => Tagged(C1Result(n), id)
  }
}

object C1 {
  case class C1Result(n: Int)
}

class C2 extends Actor {
  import C2._

  def receive = {
    case Tagged(n: Int, id) => Tagged(C2Result(n), id)
  }
}    

object C2 {
  case class C2Result(n: Int)
}
Run Code Online (Sandbox Code Playgroud)

如果您认为代码看起来很糟糕,请放轻松我,我刚刚开始学习akka;)

小智 7

在许多(或不同数量)儿童演员的情况下,Zim-Zam提出的提问模式将很快失控.

聚合模式旨在帮助这种情况.它提供了一个聚合器特征,您可以在actor中使用它来执行聚合逻辑.

想要执行聚合的客户端actor可以启动基于聚合器的actor实例,并向其发送一条消息,该消息将启动聚合过程.

应为每个聚合操作创建一个新的聚合器,并在发送回结果时终止(当它收到所有响应或超时时).

下面列出了这种模式的示例,该模式用于对由Child类表示的actor所持有的整数值求和.(注意,它们不需要由同一个父actor监督的所有孩子:SummationAggregator只需要一个ActorRef集合.)

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import akka.actor._
import akka.contrib.pattern.Aggregator

object Child {
  def props(value: Int): Props = Props(new Child(value))

  case object GetValue
  case class GetValueResult(value: Int)
}

class Child(value: Int) extends Actor {
  import Child._

  def receive = { case GetValue => sender ! GetValueResult(value) }
}

object SummationAggregator {
  def props = Props(new SummationAggregator)

  case object TimedOut
  case class StartAggregation(targets: Seq[ActorRef])
  case object BadCommand
  case class AggregationResult(sum: Int)
}

class SummationAggregator extends Actor with Aggregator {
  import Child._
  import SummationAggregator._

  expectOnce {
    case StartAggregation(targets) =>
      // Could do what this handler does in line but handing off to a 
      // separate class encapsulates the state a little more cleanly
      new Handler(targets, sender())
    case _ =>
      sender ! BadCommand
      context stop self
  }

  class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
    // Could just store a running total and keep track of the number of responses 
    // that we are awaiting...
    var valueResults = Set.empty[GetValueResult]

    context.system.scheduler.scheduleOnce(1.second, self, TimedOut)

    expect {
      case TimedOut =>
        // It might make sense to respond with what we have so far if some responses are still awaited...
        respondIfDone(respondAnyway = true)
    }

    if (targets.isEmpty)
      respondIfDone()
    else
      targets.foreach { t =>
        t ! GetValue
        expectOnce {
          case vr: GetValueResult =>
            valueResults += vr
            respondIfDone()
        }
      }

    def respondIfDone(respondAnyway: Boolean = false) = {
      if (respondAnyway || valueResults.size == targets.size) {
        originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
        context stop self
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

要使用您的父actor中的SummationAggregator,您可以:

context.actorOf(SummationAggregator.props) ! StartAggregation(children)
Run Code Online (Sandbox Code Playgroud)

然后在父接收的某处处理AggregationResult.


Zim*_*oot 5

你可以使用?而不是!在儿童演员身上 - 这会导致儿童演员Future用他们的(最终)结果返回一个,即一切都仍然没有阻塞,直到你Await的结果Future.然后父作者可以将它们组合Futures并发送给另一个演员 - 它已经知道每个Future's标识,因此您不必担心标记每条消息,以便以后可以将它们按顺序放回.这是一个简单的例子,其中每个孩子返回一个随机的Double,你想要将第一个孩子的返回值除以第二个孩子的返回值(即顺序很重要).

import scala.concurrent.duration._

import akka.actor.{Props, Actor}
import akka.pattern.{ask, pipe}
import akka.util.Timeout

class A extends Actor {
  val c1 = context.actorOf(Props[C], "C1")
  val c2 = context.actorOf(Props[C], "C2")

  // The ask operation involves creating an internal actor for handling 
  // this reply, which needs to have a timeout after which it is
  // destroyed in order not to leak resources; see more below.
  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case _ => {
      val f1 = c1 ? "anything" // Future[Any]
      val f2 = c2 ? "anything" // Future[Any]
      val result: Future[Double] = for {
        d1 <- f1.mapTo[Double]
        d2 <- f2.mapTo[Double]
      } yield d1 / d2
  }
}

class C extends Actor {
  def receive = {
    case _ => // random Double
  }
}
Run Code Online (Sandbox Code Playgroud)