Akka调度模式

MyT*_*tle 5 scala akka

考虑经典的"字数"计划.它计算某些目录中所有文件中的单词数.Master接收一些目录并在Worker actor之间拆分作业(每个worker使用一个文件).这是伪代码:

class WordCountWorker extends Actor {

  def receive = {
    case FileToCount(fileName:String) =>
      val count = countWords(fileName)
      sender ! WordCount(fileName, count)
  }
}

class WordCountMaster extends Actor {
  def receive = {
    case StartCounting(docRoot) => // sending each file to worker
      val workers = createWorkers()
      fileNames = scanFiles(docRoot)
      sendToWorkers(fileNames, workers)
    case WordCount(fileName, count) => // aggregating results
      ...

  }
}
Run Code Online (Sandbox Code Playgroud)

但我希望按计划运行此字数统计程序(例如每1分钟),提供不同的扫描目录.

而Akka为调度消息传递提供了很好的方法:

system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))
Run Code Online (Sandbox Code Playgroud)

但是上述调度程序的问题在调度程序通过tick发送新消息时启动,但之前的消息尚未处理(例如我发送消息扫描一些大目录,1秒后我发送另一条消息来扫描另一个目录,因此操作第一个目录的处理尚未完成).因此,我WordCountMaster将收到WordCount处理不同目录的工作人员的消息.

作为一种解决办法,而不是调度消息发送,我可以安排一些代码块,这将创建每次执行 WordCountMaster.即一个目录=一个WordCountMaster.但我觉得它效率低下,而且我需要注意提供唯一的名称WordCountMaster以避免InvalidActorNameException.

所以我的问题是:我是否应该WordCountMaster像上面提到的那样为每个刻度创建新的?或者有一些更好的想法/模式如何重新设计该程序以支持调度?


一些更新:如果每个目录创建一个主演员,我有一些问题:

  1. 命名演员的问题

InvalidActorNameException:actor名称[WordCountMaster]不是唯一的!

InvalidActorNameException:actor名称[WordCountWorker]不是唯一的!

我可以克服这个问题而不提供演员姓名.但在这种情况下,我的演员会收到自动生成的名字$a,$b等等.这对我来说并不好.

  1. 配置问题:

我想将我的路由器的配置排除在外application.conf.即我想为每个WordCountWorker路由器提供相同的配置.但由于我不控制演员姓名,所以我不能使用下面的配置,因为我不知道演员姓名:

  /wordCountWorker{
    router = smallest-mailbox-pool
    nr-of-instances = 5
    dispatcher = word-counter-dispatcher
  }
Run Code Online (Sandbox Code Playgroud)

Gre*_*man 4

我不是 Akka 专家,但我认为每个聚合都有一个 Actor 的方法并不是低效的。您需要以某种方式保持并发聚合分离。您可以为每个聚合提供一个 id,以便在唯一的主 actor 中将它们通过 id 分开,或者您可以使用 Akka actor 命名和生命周期逻辑,并将每个计数轮的每个聚合委托给将生存的 actor只是为了聚合逻辑。

对我来说,每个聚合使用一个参与者似乎更优雅。

另请注意,Akka 有一个聚合模式的实现,如此处所述