我正在研究的项目需要从SQS读取消息,我决定使用Akka来分发这些消息的处理.
由于SQS是Camel支持的,并且内置了在Consumer类中使用Akka的功能,我想最好以这种方式实现端点和读取消息,尽管我没有看到很多人这样做的例子.
我的问题是我不能足够快地轮询我的队列以保持我的队列空,或接近空.我最初的想法是,我可以让消费者从SQS以X/s的速率接收来自Camel的消息.从那里,我可以简单地创建更多的消费者,以达到我需要处理消息的速度.
我的消费者:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
Run Code Online (Sandbox Code Playgroud)
如图所示,我设置delay=1以及&maxMessagesPerPoll=10提高消息速率,但我无法使用相同的端点生成多个消费者.
我在文档中读到了这一点By default endpoints are assumed not to support multiple consumers.,我相信这也适用于SQS端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息Count for actor: x而不是输出的其他消息Count for …