我目前的申请是基于akka 1.1.它有多个ProjectAnalysisActors负责处理特定项目的分析任务.当这样的actor接收到通用开始消息时,开始分析.完成一个步骤后,只要定义了一个步骤,它就会向下一步发送一条消息.执行代码基本上如下所示
sealed trait AnalysisEvent {
def run(project: Project): Future[Any]
def nextStep: AnalysisEvent = null
}
case class StartAnalysis() extends AnalysisEvent {
override def run ...
override def nextStep: AnalysisEvent = new FirstStep
}
case class FirstStep() extends AnalysisEvent {
override def run ...
override def nextStep: AnalysisEvent = new SecondStep
}
case class SecondStep() extends AnalysisEvent {
...
}
class ProjectAnalysisActor(project: Project) extends Actor {
def receive = {
case event: AnalysisEvent =>
val future = event.run(project)
future.onComplete { f =>
self ! event.nextStep
}
}
}
Run Code Online (Sandbox Code Playgroud)
我在如何为每个分析步骤的run-methods实现代码方面遇到了一些困难.目前,我在每个运行方法中创建了一个新的未来.在这个未来,我将所有后续消息发送到不同的子系统.其中一些是非阻塞的"即发即弃"消息,但其中一些返回的结果应该在下一个分析步骤开始之前存储.
目前,典型的运行方法如下所示
def run(project: Project): Future[Any] = {
Future {
progressActor ! typicalFireAndForget(project.name)
val calcResult = (calcActor1 !! doCalcMessage(project)).getOrElse(...)
val p: Project = ... // created updated project using calcResult
val result = (storage !! updateProjectInformation(p)).getOrElse(...)
result
}
}
Run Code Online (Sandbox Code Playgroud)
由于应该避免那些阻止消息,我想知道这是否是正确的方法.在这个用例中使用它们是否有意义,还是应该避免使用它?如果是这样,那么什么是正确的解决方案?
显然,唯一的目的ProjectAnalysisActor是连结未来的电话.其次,run方法似乎也等待结果继续计算.
所以我认为您可以尝试重构您的代码以使用Future Composition,如下所述:http://akka.io/docs/akka/1.1/scala/futures.html
def run(project: Project): Future[Any] = {
progressActor ! typicalFireAndForget(project.name)
for(
calcResult <- calcActor1 !!! doCalcMessage(project);
p = ... // created updated project using calcResult
result <- storage !!! updateProjectInformation(p)
) yield (
result
)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
367 次 |
| 最近记录: |