用于处理接收中的异步操作的Akka模式

Arn*_*sen 2 io asynchronous scala akka

我有一个接收指标数据点的Actor,并定期聚合并将它们保存到磁盘.后一种操作会进行I/O操作,因此我不想使用阻塞操作.但是,如果我将其切换为异步,如何在聚合完成之前阻止接收其他数据点而不阻塞某处.

我见过的一种模式是使用Stash,如下所示:

class Aggregator extends Actor with Stash {
  def receive = processing

  def processing: Receive = {
    case "aggregate" => {
      context.become(aggregating)
      aggregate().onComplete {
        case Success => self ! "aggregated"
        case Failure => self ! "aggregated"
      }
    }
    case msg => ??? // Process task
  }

  def aggregating: Receive = {
    case "aggregated" =>
      unstashAll()
      context.become(processing)
    case msg =>
      stash()
  }
}
Run Code Online (Sandbox Code Playgroud)

我对此的不满是我的集体行动的完成只是任何人都可以发送的信息.据我了解,我不能在我的未来完成中影响"不合适".

作为旁注,我无法确定完成onComplete是否以某种方式由同一个调度程序执行receive,因为如果它们不是,则完成会破坏演员提供的单线程保护.

或者是否有更好的模式来完成非同步和直接内部的操作,receive同时保证我的状态在完成之前不能被更改?似乎这种情况任何时候演员状态处理任何类型的I/O(如数据库),显然你会想要避免同步I/O,如果可以的话.

maa*_*asg 5

您的聚合器actor目前正在做两件事:聚合和存储.您可以通过拆分这两项任务来解决问题并简化系统.该单责任的原则也适用于行为者.

我将创建一个专门的编写actor和一个用于保存聚合数据的消息类.这个actor子系统应该如下所示: Aggregator-Store actor子系统

理想情况下,写入磁盘所需的时间比聚合间隔短,因此系统保持稳定.在出现峰值的情况下,DataStore actor的队列将作为缓冲区用于将消息写入存储.

根据您的应用程序,您可能需要实现某种形式的确认和重试,以防您希望确保已编写聚合数据.