jde*_*lop 5 priority-queue publish-subscribe actor akka
我需要将不同类型的消息发布到事件流,并且这些消息应该具有不同的优先级,例如,如果已经发布了10个类型A的消息,并且毕竟发布了一个类型B的消息,并且B的优先级高于即使队列中有10个A类消息,A - 消息B的优先级也应由下一个actor获取.
我在这里阅读了有关优先级消息的内容,并创建了我对该邮箱的简单实现:
class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}
)
Run Code Online (Sandbox Code Playgroud)
然后我在application.conf中配置它
akka {
actor {
prio-dispatcher {
type = "Dispatcher"
mailbox-type = "mailbox.PrioritizedMailbox"
}
}
}
Run Code Online (Sandbox Code Playgroud)
并连接到我的演员:
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")
Run Code Online (Sandbox Code Playgroud)
我正在使用ActorSystem.eventStream.publish来发送消息,我的演员订阅了它(我可以在日志中看到消息被处理,但是按FIFO顺序).
但是看起来还不够,因为在日志/控制台中我从未见过像"默认优先级"这样的消息.我在这里错过了什么吗?描述的方法是使用事件流还是直接调用在actor上发送消息?如何使用eventStream获取优先级消息?
Vik*_*ang 10
你的问题是你的演员疯狂地快,所以在他们有时间排队之前处理消息,所以邮箱不能进行任何优先处理.下面的例子证明了这一点:
trait Foo
case object X extends Foo
case object Y extends Foo
case object Z extends Foo
class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case X ? 0
case Y ? 1
case Z ? 2
case _ ? 10
})
val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString(
""" prio-dispatcher {
type = "Dispatcher"
mailbox-type = "%s"
}""".format(classOf[PrioritizedMailbox].getName)))
val latch = new java.util.concurrent.CountDownLatch(1)
val a = s.actorOf(Props(new akka.actor.Actor {
latch.await // Just wait here so that the messages are queued up
inside the mailbox
def receive = {
case any ? /*println("Processing: " + any);*/ sender ! any
}
}).withDispatcher("prio-dispatcher"))
implicit val sender = testActor
a ! "pig"
a ! Y
a ! Z
a ! Y
a ! X
a ! Z
a ! X
a ! "dog"
latch.countDown()
Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) }
s.shutdown()
Run Code Online (Sandbox Code Playgroud)
这个测试通过了飞行的颜色
归档时间: |
|
查看次数: |
3522 次 |
最近记录: |