lau*_*ids 0 queue design-patterns scala akka
我有一个N由Actor调度的消息队列,我想全部消耗它们.如果队列为空,则actor将返回Message类型或NoMessages类型.
我想出了这个,但不觉得惯用,而且我不确定每次打电话时我都会旋转多少个线程consume()?
这样做的更好方法是什么?
def main(): Unit = {
val queue = system.actorOf(...)
def consume(): Unit = {
ask(queue, Read) foreach {
case Message(m) => {
// handle message
consume()
}
case NoMessages => {
system.shutdown()
}
}
}
consume()
}
Run Code Online (Sandbox Code Playgroud)
如果Message并NoMessages扩展一个共同特征(让我们称之为Msg),你可以使用Akka Streams:
import akka.Done
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
import scala.concurrent._
import scala.concurrent.duration._
implicit val system = ActorSystem("QueueSys")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val queue = system.actorOf(...)
def handleMessage(msg: Message): Unit = ???
implicit val askTimeout = Timeout(5.seconds)
val stream: Future[Done] = Source.fromIterator(() => Iterator.continually(Read))
.ask[Msg](parallelism = 3)(queue) // adjust the parallelism as needed
.takeWhile(_.isInstanceOf[Message])
.runForeach(handleMessage)
stream.onComplete(_ => system.terminate())
Run Code Online (Sandbox Code Playgroud)
上面的流将不断向演员发送Read消息queue并处理Message响应,直到演员回复NoMessage.
| 归档时间: |
|
| 查看次数: |
127 次 |
| 最近记录: |