pla*_*oom 5 scala rabbitmq actor akka
我有一个使用RabbitMQ消息的应用程序,我正在使用Actors来处理工作.
这是我的方法:
object QueueConsumer extends Queue {
def consumeMessages = {
setupListener(buildChannel(resultsQueueName), resultsQueueName,
resultsCallback)
}
private def setupListener(receivingChannel: Channel, queue: String,
f: (String) => Any) {
Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
}
}
class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{
def receive = {
case _ => startReceiving
}
def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Props(new Actor {
def receive = {
case some: String => f(some)
}
})) ! msg
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}
Run Code Online (Sandbox Code Playgroud)
运行几秒钟后,它会抛出java.lang.OutOfMemoryError:超出GC开销限制.
我认为它正在发生,因为我正在为我收到的每条消息开始一个新的Actor - 所以如果我有100000条消息,它将创建10万个演员.这是一个好方法还是我应该实现像'演员池'这样的东西?
任何人都知道如何在我的场景中避免OutOfMemoryError?
预先感谢.
EDIT1:
改变方法:
class Queue2(json:String) extends Actor {
def receive = {
case x: String =>
val envelope = MessageEnvelopeParser.toObject(x)
val processor = ProcessQueueServiceFactory.getProcessResultsService()
envelope.messages.foreach(message => processor.process(message))
}
}
object Queue2 {
def props(json: String): Props = Props(new Queue2(json))
}
class QueueActor(channel:Channel, queue:String) extends Actor {
def receive = {
case _ => startReceiving
}
def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Queue2.props(msg))
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}
Run Code Online (Sandbox Code Playgroud)
您的每条消息演员在完成后需要停止自己,否则他们将永远留在身边。请参阅有关Actor 生命周期和停止 Actor 的文档。这里\xe2\x80\x99只需context.stop(self)处理完成后添加即可。