使用Akka Actors的OutOfMemoryError

pla*_*oom 5 scala rabbitmq actor akka

我有一个使用RabbitMQ消息的应用程序,我正在使用Actors来处理工作.

这是我的方法:

object QueueConsumer extends Queue {

  def consumeMessages = {
    setupListener(buildChannel(resultsQueueName), resultsQueueName,
        resultsCallback)
  }

  private def setupListener(receivingChannel: Channel, queue: String, 
        f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
      Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
  }

}

class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume(queue, false, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Props(new Actor {
    def receive = {
      case some: String => f(some)
    }
      })) ! msg
      channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
    }
  }

}
Run Code Online (Sandbox Code Playgroud)

运行几秒钟后,它会抛出java.lang.OutOfMemoryError:超出GC开销限制.

我认为它正在发生,因为我正在为我收到的每条消息开始一个新的Actor - 所以如果我有100000条消息,它将创建10万个演员.这是一个好方法还是我应该实现像'演员池'这样的东西?

任何人都知道如何在我的场景中避免OutOfMemoryError?

预先感谢.

EDIT1:

改变方法:

class Queue2(json:String) extends Actor {

  def receive = {
    case x: String =>
      val envelope = MessageEnvelopeParser.toObject(x)
      val processor = ProcessQueueServiceFactory.getProcessResultsService()
      envelope.messages.foreach(message => processor.process(message))
  }

}

object Queue2 {
  def props(json: String): Props = Props(new Queue2(json))
}

class QueueActor(channel:Channel, queue:String) extends Actor {

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume(queue, false, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Queue2.props(msg))
      channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

Rol*_*uhn 2

您的每条消息演员在完成后需要停止自己,否则他们将永远留在身边。请参阅有关Actor 生命周期停止 Actor 的文档。这里\xe2\x80\x99只需context.stop(self)处理完成后添加即可。

\n