小编Jef*_*oie的帖子

使用Akka,SQS和Camel的消费者民意调查率

我正在研究的项目需要从SQS读取消息,我决定使用Akka来分发这些消息的处理.

由于SQS是Camel支持的,并且内置了在Consumer类中使用Akka的功能,我想最好以这种方式实现端点和读取消息,尽管我没有看到很多人这样做的例子.

我的问题是我不能足够快地轮询我的队列以保持我的队列空,或接近空.我最初的想法是,我可以让消费者从SQS以X/s的速率接收来自Camel的消息.从那里,我可以简单地创建更多的消费者,以达到我需要处理消息的速度.

我的消费者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}
Run Code Online (Sandbox Code Playgroud)

如图所示,我设置delay=1以及&maxMessagesPerPoll=10提高消息速率,但我无法使用相同的端点生成多个消费者.

我在文档中读到了这一点By default endpoints are assumed not to support multiple consumers.,我相信这也适用于SQS端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息Count for actor: x而不是输出的其他消息Count for …

scala apache-camel amazon-sqs akka

6
推荐指数
1
解决办法
1743
查看次数

标签 统计

akka ×1

amazon-sqs ×1

apache-camel ×1

scala ×1