从队列中消耗好Scala模式,直到没有更多消息?

lau*_*ids 0 queue design-patterns scala akka

我有一个N由Actor调度的消息队列,我想全部消耗它们.如果队列为空,则actor将返回Message类型或NoMessages类型.

我想出了这个,但不觉得惯用,而且我不确定每次打电话时我都会旋转多少个线程consume()

这样做的更好方法是什么?

def main(): Unit = {

  val queue = system.actorOf(...)

  def consume(): Unit = {
    ask(queue, Read) foreach {
      case Message(m) => {
        // handle message
        consume()
      }
      case NoMessages =>  {
        system.shutdown()
      }
    }
  }
  consume()
}
Run Code Online (Sandbox Code Playgroud)

Jef*_*ung 6

如果MessageNoMessages扩展一个共同特征(让我们称之为Msg),你可以使用Akka Streams:

import akka.Done
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
import scala.concurrent._
import scala.concurrent.duration._

implicit val system = ActorSystem("QueueSys")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val queue = system.actorOf(...)

def handleMessage(msg: Message): Unit = ???

implicit val askTimeout = Timeout(5.seconds)

val stream: Future[Done] = Source.fromIterator(() => Iterator.continually(Read))
  .ask[Msg](parallelism = 3)(queue) // adjust the parallelism as needed
  .takeWhile(_.isInstanceOf[Message])
  .runForeach(handleMessage)

stream.onComplete(_ => system.terminate())
Run Code Online (Sandbox Code Playgroud)

上面的流将不断向演员发送Read消息queue并处理Message响应,直到演员回复NoMessage.