假设我正在向Actor发送消息,当它正在处理一条消息时,可能会出现更多消息.现在,当它准备好处理下一条消息时,我希望它只处理最新的消息,因为之前的消息已经过时了.我怎样才能做到最好?
使用scala Actors库我可以通过首先检查发件人来实现这一目的,如下所示:
if (myActor.getState != Runnable)
myActor ! message
Run Code Online (Sandbox Code Playgroud)
但我不认为我可以在Akka系统中进行这样的测试
Vik*_*ang 11
无需实现自己的邮箱.完全没有.
删除了大量文本,让这段代码说明一切:
// Either implement "equals" so that every job is unique (by default) or do another comparison in the match.
class Work
case class DoWork(work: Work)
class WorkerActor extends Actor {
// Left as an exercise for the reader, it clearly should do some work.
def perform(work: Work): Unit = ()
def lookingForWork: Receive = {
case w: Work =>
self forward DoWork(w)
context become prepareToDoWork(w)
}
def prepareToDoWork(work: Work): Receive = {
case DoWork(`work`) =>
// No new work, so perform this one
perform(work)
// Now we're ready to look for new work
context become lookingForWork
case DoWork(_) =>
// Discard work that we don't need to do anymore
case w2: Work =>
// Prepare to do this newer work instead
context become prepareToDoWork(w2)
}
//We start out as looking for work
def receive = lookingForWork
}
Run Code Online (Sandbox Code Playgroud)
这意味着只有在邮箱中没有较新的工作时才会执行工作.
您可以实现自己的邮箱,这种方法不会影响您的actor实现.有关演员实施更改而不是自定义邮箱实现的解决方案,请参阅此答案.
删除旧邮件的邮箱的实现enqueue:
package akka.actor.test
import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import akka.dispatch.{Envelope, MessageQueue}
class SingleMessageMailbox extends akka.dispatch.MailboxType {
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = this()
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new MessageQueue {
val message = new java.util.concurrent.atomic.AtomicReference[Envelope]
final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit =
Option(message.get) foreach {deadLetters.enqueue(owner, _)}
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
for {e <- Option(message.getAndSet(handle))}
receiver.asInstanceOf[InternalActorRef].
provider.deadLetters.
tell(DeadLetter(e.message, e.sender, receiver), e.sender)
def dequeue(): Envelope = message.getAndSet(null)
def numberOfMessages: Int = Option(message.get).size
def hasMessages: Boolean = message.get != null
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,我有这个类添加到包akka.actor发送旧邮件使用一纸空文InternalActorRef样实现的BoundedQueueBasedMessageQueue.
如果您只想跳过旧消息,可以enqueue像这样实现:
def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle)
Run Code Online (Sandbox Code Playgroud)
用法:
object Test extends App {
import akka.actor._
import com.typesafe.config.ConfigFactory
// you should use your config file instead of ConfigFactory.parseString
val actorSystem: ActorSystem =
ActorSystem("default", ConfigFactory.parseString(
"""
akka.daemonic=on
myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox"
"""))
class EchoActor extends Actor {
def receive = {
case m => println(m); Thread.sleep(500)
}
}
val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox"))
for {i <- 1 to 10} {
actor ! i
Thread.sleep(100)
}
Thread.sleep(1000)
}
Run Code Online (Sandbox Code Playgroud)
测试:
$ sbt run
1
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
5
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
10
Run Code Online (Sandbox Code Playgroud)
另请参见akka/Mailboxes.