Kev*_*vin 7 nonblocking actor akka
等待Akka中多个演员结果的正确方法是什么?
反应性规划课程的原理课程有一个复制的键值存储的练习.在不进入赋值细节的情况下,它需要等待多个actor的确认才能指示复制完成.
我使用包含未完成请求的可变映射实现了赋值,但我觉得解决方案有"难闻的气味".我希望有更好的方法来实现看似常见的场景.
为了通过隐瞒我的练习解决方案来维护课程的荣誉代码,我有一个描述类似问题的抽象用例.
发票行项目需要计算其纳税义务.纳税义务是跨多个税务机关(例如,联邦,州,警区)应用于项目的所有税收的组合.如果每个征税机构都是能够确定订单项税务责任的行为者,则该订单项需要所有参与者报告才能继续报告整体纳税义务.在Akka完成这种情况的最佳/正确方法是什么?
这是我相信您正在寻找的简化示例。它显示了像演员这样的主人如何催生一些童工,然后等待他们的所有响应,处理可能发生超时等待结果的情况。该解决方案显示了如何等待初始请求,然后在等待响应时切换到新的接收功能。它还显示了如何将状态传播到等待的接收函数中,以避免必须在实例级别具有显式的可变状态。
object TaxCalculator {
sealed trait TaxType
case object StateTax extends TaxType
case object FederalTax extends TaxType
case object PoliceDistrictTax extends TaxType
val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)
case class GetTaxAmount(grossEarnings:Double)
case class TaxResult(taxType:TaxType, amount:Double)
case class TotalTaxResult(taxAmount:Double)
case object TaxCalculationTimeout
}
class TaxCalculator extends Actor{
import TaxCalculator._
import context._
import concurrent.duration._
def receive = waitingForRequest
def waitingForRequest:Receive = {
case gta:GetTaxAmount =>
val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
children foreach (_ ! gta)
setReceiveTimeout(2 seconds)
become(waitingForResponses(sender, AllTaxTypes))
}
def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
case TaxResult(tt, amount) =>
val newTaxes = taxes ++ Map(tt -> amount)
if (newTaxes.keySet == expectedTypes){
respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
context stop self
}
else{
become(waitingForResponses(respondTo, expectedTypes, newTaxes))
}
case ReceiveTimeout =>
respondTo ! TaxCalculationTimeout
context stop self
}
def propsFor(taxType:TaxType) = taxType match{
case StateTax => Props[StateTaxCalculator]
case FederalTax => Props[FederalTaxCalculator]
case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
}
}
trait TaxCalculatingActor extends Actor{
import TaxCalculator._
val taxType:TaxType
val percentage:Double
def receive = {
case GetTaxAmount(earnings) =>
val tax = earnings * percentage
sender ! TaxResult(taxType, tax)
}
}
class FederalTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.FederalTax
val percentage = 0.20
}
class StateTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.StateTax
val percentage = 0.10
}
class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.PoliceDistrictTax
val percentage = 0.05
}
Run Code Online (Sandbox Code Playgroud)
然后,您可以使用以下代码对此进行测试:
import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
case util.Success(TotalTaxResult(amount)) =>
println(s"Got tax total of $amount")
case util.Success(TaxCalculationTimeout) =>
println("Got timeout calculating tax")
case util.Failure(ex) =>
println(s"Got exception calculating tax: ${ex.getMessage}")
}
Run Code Online (Sandbox Code Playgroud)
这是 Akka 中非常常见的问题。你有多个演员可以为你完成这项工作,你需要将他们结合起来。
Jammie Allen 在他的书《Effective Akka》(关于从各种类型的帐户中获取银行帐户余额)中提出的解决方案是,您生成一个 Actor,该 Actor 将生成多个执行此工作的 Actor(例如计算税款)。它将等待他们所有人的回答。
一个你不应该使用ask但应该插入的陷阱tell。
当您生成多个参与者(例如 FederalTaxactor、StateTaxActor...)时,您向他们发送一条消息,其中包含他们需要处理的数据。然后你就知道需要收集多少答案。对于每个回复,您都会检查所有回复是否都在那里。如果没有你就等着吧。
问题是,如果任何参与者失败,你可能会永远等待。所以你给自己安排了一条超时消息。如果没有找到所有答案,您将返回操作未成功完成的信息。
Akka 有一个特殊的实用程序,可以为您自己安排超时,可以作为一个很好的帮手。