And*_*rea 14 scala throttling actor akka
我在Akka有一个演员,它将处理消息以创建某些实体.这些实体上的某些字段是根据创建时数据库中其他实体的状态计算的.
我想避免创建一个竞争条件,其中actor处理速度比数据库能够持久化实体的速度快.这可能会导致数据不一致,如下所示:
Foo并将其发送给其他actor以进行进一步处理和保存Foo.由于第一个尚未保存,因此根据DB的旧内容创建新的一个,从而产生错误Foo.现在,这种可能性非常小,因为Foos 的创建将手动触发.但仍然可以想象,双击可能会在高负荷下引起问题.谁知道明天Foo是否会自动创建.
因此,我需要的是告诉演员等待的一些方法,并且只有在确认Foo已经保存之后才恢复其操作.
有没有办法让一个演员处于空闲状态,并告诉它一段时间后恢复其操作?
基本上,我想将邮箱用作消息队列,并控制队列的处理速度.
Rol*_*uhn 24
不,你不能暂停演员:演员总是尽快从他们的邮箱中提取邮件.这只留下了传入请求被隐藏起来的可能性,以便稍后处理:
class A(db: ActorRef) extends Actor with Stash {
def receive = {
case Request =>
doWork()
db ! Persist
context.setReceiveTimeout(5.seconds)
context.become({
case Request => stash()
case Persisted => context.unbecome(); unstashAll()
case ReceiveTimeout => throw new TimeoutException("not persisted")
}, discardOld = false)
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,不保证邮件传递(或数据库可能已关闭),因此建议执行超时.
这个问题主要出现在actor模型和域模型之间没有很好地对齐的情况下:actor是一致性的单位,但在你的用例中你的一致图像需要一个最新的外部实体(数据库)以便演员做正确的事.我不能在不了解用例的情况下推荐解决方案,但考虑到这一点,尝试重新构建您的问题.
事实证明,这只需要几行.这是我提出的解决方案,它与pagoda_5b建议一致:
class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
import QueueingActor._
def receive = {
case message =>
context.become({
case Resume =>
unstashAll()
context.unbecome()
case _ => stash()
})
nextActor ! message
}
}
object QueueingActor {
case class Resume()
}
Run Code Online (Sandbox Code Playgroud)