当8-10名演员同时跑步时,一些Scala演员进入等待状态

Sag*_*rpe 2 multithreading scala rabbitmq

在我的模型中,大约有8-9个Scala Actors.每个actor在RabbitMQ Server上都有自己的队列

在每个Actor的act方法中.它不断地列在队列中

def act {
    this ! 1
    loop {
      react {
        case 1 => processMessage(QManager.getMessage); this ! 1
      }
    }
  } 
Run Code Online (Sandbox Code Playgroud)

我是一个rabbitMq QManager getMessage方法

def getMessage: MyObject = {
    getConnection
    val durable = true
    channel.exchangeDeclare(EXCHANGE, "direct", durable)
    channel.queueDeclare(QUEUE, durable, false, false, null)
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
    consumer = new QueueingConsumer(channel)
    channel basicConsume (QUEUE, false, consumer)

    var obj = new MyObject
    try {
      val delivery = consumer.nextDelivery
      val msg = new java.io.ObjectInputStream(
        new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
      obj = msg.asInstanceOf[MyObject]
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    } catch {
      case e: Exception =>logger.error("error in Get Message", e);endConnection
    }
    endConnection
    obj
  }
Run Code Online (Sandbox Code Playgroud)

所有9个Actors都有自己的对象类型和自己的QManager

在GetMessage中,我正在使用Rabbitmq QueueConsumer

 val delivery = consumer.nextDelivery
Run Code Online (Sandbox Code Playgroud)

并且nextDelivery方法返回一个对象,当它在队列中发现时,此方法将actor置于等待状态

当我开始所有8个演员时,其中只有4个工作正常其他未说明.我测试了每个演员都是独立运行的,当他们单独开始时他们工作得很好

当我开始更多4个演员时,问题就出现了

scala actor的线程是否存在任何问题.

Rex*_*err 5

你所有的演员都在跑; 他们永远不会休息.由于演员在一个共同的线程池中共享,这意味着幸运的赢家演员一直在运行,而不幸的输家从来没有得到任何时间.如果你想让一个实体一直为自己占用一个整个线程,通常最好使用java Thread,或者至少使用receive而不是react.您还可以增加actor池的大小以匹配actor的数量,但通常如果您有大量的actor都在运行,那么您应该更仔细地考虑如何构建程序.


Vik*_*ang 5

免责声明:我是Akka的PO

正如Rex所说,你正忙着等待,在共享的线程池上占用线程.

我不知道您是否可以选择测试Akka,但我们支持AMQP消费者(和制作人)作为演员:Akka-AMQP

生成AMQP消息:

    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")
Run Code Online (Sandbox Code Playgroud)

使用AMQP消息:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
  case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))
Run Code Online (Sandbox Code Playgroud)

另一种选择是使用Akka-Camel来消费并与actor一起生成AMQP消息