在Akka等待多个结果

Kev*_*vin 7 nonblocking actor akka

等待Akka中多个演员结果的正确方法是什么?

反应性规划课程的原理课程有一个复制的键值存储的练习.在不进入赋值细节的情况下,它需要等待多个actor的确认才能指示复制完成.

我使用包含未完成请求的可变映射实现了赋值,但我觉得解决方案有"难闻的气味".我希望有更好的方法来实现看似常见的场景.

为了通过隐瞒我的练习解决方案来维护课程的荣誉代码,我有一个描述类似问题的抽象用例.

发票行项目需要计算其纳税义务.纳税义务是跨多个税务机关(例如,联邦,州,警区)应用于项目的所有税收的组合.如果每个征税机构都是能够确定订单项税务责任的行为者,则该订单项需要所有参与者报告才能继续报告整体纳税义务.在Akka完成这种情况的最佳/正确方法是什么?

cmb*_*ter 5

这是我相信您正在寻找的简化示例。它显示了像演员这样的主人如何催生一些童工,然后等待他们的所有响应,处理可能发生超时等待结果的情况。该解决方案显示了如何等待初始请求,然后在等待响应时切换到新的接收功能。它还显示了如何将状态传播到等待的接收函数中,以避免必须在实例级别具有显式的可变状态。

object TaxCalculator {
  sealed trait TaxType
  case object StateTax extends TaxType
  case object FederalTax extends TaxType
  case object PoliceDistrictTax extends TaxType
  val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)

  case class GetTaxAmount(grossEarnings:Double)
  case class TaxResult(taxType:TaxType, amount:Double)  

  case class TotalTaxResult(taxAmount:Double)
  case object TaxCalculationTimeout
}

class TaxCalculator extends Actor{
 import TaxCalculator._
 import context._
 import concurrent.duration._

  def receive =  waitingForRequest

  def waitingForRequest:Receive = {
    case gta:GetTaxAmount =>
      val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
      children foreach (_ ! gta)
      setReceiveTimeout(2 seconds)
      become(waitingForResponses(sender, AllTaxTypes))
  }

  def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
    case TaxResult(tt, amount) =>
      val newTaxes = taxes ++ Map(tt -> amount)
      if (newTaxes.keySet == expectedTypes){
        respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
        context stop self
      }
      else{
        become(waitingForResponses(respondTo, expectedTypes, newTaxes))
      }

    case ReceiveTimeout =>
      respondTo ! TaxCalculationTimeout
      context stop self
  }

  def propsFor(taxType:TaxType) = taxType match{
    case StateTax => Props[StateTaxCalculator]
    case FederalTax => Props[FederalTaxCalculator]
    case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
  }  
}

trait TaxCalculatingActor extends Actor{  
  import TaxCalculator._
  val taxType:TaxType
  val percentage:Double

  def receive = {
    case GetTaxAmount(earnings) => 
      val tax = earnings * percentage
      sender ! TaxResult(taxType, tax)
  }
}

class FederalTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.FederalTax
  val percentage = 0.20
}

class StateTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.StateTax
  val percentage = 0.10
}

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.PoliceDistrictTax
  val percentage = 0.05
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以使用以下代码对此进行测试:

import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
  case util.Success(TotalTaxResult(amount)) =>
    println(s"Got tax total of $amount")
  case util.Success(TaxCalculationTimeout) =>
    println("Got timeout calculating tax")
  case util.Failure(ex) => 
    println(s"Got exception calculating tax: ${ex.getMessage}")
}
Run Code Online (Sandbox Code Playgroud)


alm*_*dar 2

这是 Akka 中非常常见的问题。你有多个演员可以为你完成这项工作,你需要将他们结合起来。

Jammie Allen 在他的书《Effective Akka》(关于从各种类型的帐户中获取银行帐户余额)中提出的解决方案是,您生成一个 Actor,该 Actor 将生成多个执行此工作的 Actor(例如计算税款)。它将等待他们所有人的回答。

一个你不应该使用ask但应该插入的陷阱tell

当您生成多个参与者(例如 FederalTaxactor、StateTaxActor...)时,您向他们发送一条消息,其中包含他们需要处理的数据。然后你就知道需要收集多少答案。对于每个回复,您都会检查所有回复是否都在那里。如果没有你就等着吧。

问题是,如果任何参与者失败,你可能会永远等待。所以你给自己安排了一条超时消息。如果没有找到所有答案,您将返回操作未成功完成的信息。

Akka 有一个特殊的实用程序,可以为您自己安排超时,可以作为一个很好的帮手。