Dav*_*ria 5 scala amazon-sqs akka akka-stream aws-sdk
我使用Scala中的Akka Streams使用AWS Java SDK从AWS SQS队列进行轮询.我创建了一个ActorPublisher,它以两秒的间隔使消息出列:
class SQSSubscriber(name: String) extends ActorPublisher[Message] {
implicit val materializer = ActorMaterializer()
val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")
val client = new AmazonSQSClient()
client.setRegion(RegionUtils.getRegion("us-east-1"))
val url = client.getQueueUrl(name).getQueueUrl
val MaxBufferSize = 100
var buf = Vector.empty[Message]
override def receive: Receive = {
case "dequeue" =>
val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self ! _)
case message: Message if buf.size == MaxBufferSize =>
log.error("The buffer is full")
case message: Message =>
if (buf.isEmpty && totalDemand > 0)
onNext(message)
else {
buf :+= message
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
Run Code Online (Sandbox Code Playgroud)
在我的应用程序中,我也试图以2秒的间隔运行流程:
val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
.map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
.to(Sink.ignore)
system.scheduler.schedule(0 seconds, 2 seconds) {
flow.runWith(sqsSource)(ActorMaterializer()(system))
}
Run Code Online (Sandbox Code Playgroud)
但是,当我运行我的应用程序时,我会收到java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]并随后发出死信通知ActorMaterializer.
是否有推荐的方法来持续实现Akka Stream?
我不认为你需要ActorPublisher每2秒创建一个新的.这似乎是多余的,浪费了内存.另外,我认为不需要ActorPublisher.从我所知的代码来看,你的实现将有越来越多的Streams查询相同的数据.Message来自客户端的每个将由N个不同的akka Streams处理,更糟糕的是,N将随着时间的推移而增长.
无限循环查询的迭代器
您可以使用scala从ActorPublisher获得相同的行为Iterator.可以创建一个不断查询客户端的Iterator:
//setup the client
val client = {
val sqsClient = new AmazonSQSClient()
sqsClient setRegion (RegionUtils getRegion "us-east-1")
sqsClient
}
val url = client.getQueueUrl(name).getQueueUrl
//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}
def messageListIteartor : Iterator[Iterable[Message]] =
Iterator continually messageListStream
//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity
Run Code Online (Sandbox Code Playgroud)
此实现仅在所有先前消息已被消耗时查询客户端,因此才真正被动.无需跟踪具有固定大小的缓冲区.您的解决方案需要一个缓冲区,因为消息(通过计时器)的创建与消息消息(通过println)分离.在我的实施中,创造和消费通过背压紧密耦合.
Akka Stream Source
然后,您可以使用此Iterator生成器函数来提供akka流源:
def messageSource : Source[Message, _] = Source fromIterator messageIterator
Run Code Online (Sandbox Code Playgroud)
流动形成
最后,你可以使用这个源来执行println(作为一个方面说明:您的flow价值实际上是Sink因为Flow + Sink = Sink).使用问题中的flow值:
messageSource runWith flow
Run Code Online (Sandbox Code Playgroud)
一个akka Stream处理所有消息.