Akka框架支持查找重复的消息

sca*_*ome 5 scala akka

我正在尝试用Akka和Scala构建一个高性能的分布式系统.

如果请求昂贵(和无副作用)计算的消息到来,并且之前已经请求了完全相同的计算,我想避免再次计算结果.如果先前请求的计算已经完成并且结果可用,我可以缓存它并重新使用它.

然而,可以请求重复计算的时间窗口可以是任意小的.例如,对于所有实际目的,我可以在同一时刻获得一千或一百万条消息请求相同的昂贵计算.

据称有一种名为Gigaspaces的商业产品可以处理这种情况.

但是,目前似乎没有框架支持在Akka处理重复的工作请求.鉴于Akka框架已经可以访问通过框架路由的所有消息,看起来框架解决方案在这里可能很有意义.

以下是我为Akka框架提出的建议:1.创建一个特征来指示要遵循以下缓存方法的消息类型(例如,"ExpensiveComputation"或类似的东西).2.智能地(散列等)识别在用户可配置的时间窗口内由(相同或不同的)演员接收的相同消息.其他选项:选择要用于此目的的最大内存缓冲区大小,取决于(比如LRU)替换等.Akka还可以选择仅缓存处理成本高昂的消息的结果; 如果需要,可以再次重新处理花费很少时间处理的消息; 无需浪费宝贵的缓冲空间来缓存它们及其结果.3.当识别出相同的消息(在该时间窗口内接收,可能"在同一时刻")时,避免不必要的重复计算.框架会自动执行此操作,实际上,重复的消息永远不会被新的actor接收以进行处理; 他们会默默地消失,并且处理过一次的结果(无论该计算是在过去已经完成,还是在当时正在进行)将被发送给所有适当的接收者(如果已经可用,则立即发送,如果没有,则完成计算).请注意,即使"回复"字段不同,也应认为消息相同,只要它们表示的语义/计算在其他方面都相同即可.还要注意,计算应该是纯粹的功能,即没有副作用,因为建议的缓存优化工作并且根本不改变程序语义.

如果我的建议与Akka的做事方式不兼容,和/或如果你看到一些强烈的理由为什么这是一个非常糟糕的主意,请告诉我.

谢谢,太棒了,斯卡拉

Nei*_*ssy 11

你问的不是依赖于Akka框架,而是你如何构建你的演员和消息.首先确保您的消息是不可变的,并通过equals/hashCode方法具有适当定义的标识.案例类为您提供免费,但如果您在消息中嵌入了actorRef用于回复目的,则必须覆盖标识方法.case类参数也应该递归地具有相同的属性(不可变和适当的标识).

其次,您需要弄清楚演员如何处理存储和识别当前/过去的计算.最简单的方法是将请求唯一映射到actor.这种方式,演员和只有那个演员将处理该特定请求.给定一组固定的actor和请求的hashCode,这可以很容易地完成.如果演员集受到监督,其中主管正在管理负载平衡/映射并替换失败的演员(Akka使这部分变得容易),则奖励积分.

最后,actor本身可以根据您描述的标准维护响应缓存行为.在actor的上下文中,一切都是线程安全的,因此请求本身键入的LRU缓存(良好的身份属性记忆)对于您想要的任何类型的行为都很容易.


Vik*_*ang 5

正如Neil所说,这不是真正的框架功能,实现它并将其抽象为自己的特性是相当简单的.

trait CachingExpensiveThings { self: Actor =>
  val cache = ...
  def receive: Actor.Receive = {
    case s: ExpensiveThing => cachedOrCache(s)
  }

  def cacheOrCached(s: ExpensiveThing) = cache.get(s) match {
    case null => val result = compute(s)
                 cache.put(result)
                 self.reply_?)(result)
    case cached => self.reply_?)(cached)
  }
  def compute(s: ExpensiveThing): Any 
}


class MyExpensiveThingCalculator extends Actor with CachingExpensiveThings {
  def compute(s: ExpensiveThing) = {
    case l: LastDigitOfPi => ...
    case ts: TravellingSalesman => ...
  }
}
Run Code Online (Sandbox Code Playgroud)