Spring cloud SQS - 轮询间隔

yot*_*ain 7 message-queue amazon-sqs amazon-web-services aws-sdk spring-cloud

使用Spring云监听AWS SQS队列,如下所示:

@SqsListener(value = "${queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders) {
    // code
}
Run Code Online (Sandbox Code Playgroud)

Spring配置:

<aws-messaging:annotation-driven-queue-listener
    max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
    amazon-sqs="awsSqsClient" />
Run Code Online (Sandbox Code Playgroud)

AwsSqsClient:

@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);
}
Run Code Online (Sandbox Code Playgroud)

这很好用.

在SQS客户端中配置了10个线程来处理这些消息,如上所示.这也工作正常,在任何时间点处理最多10条消息.

问题是,我无法弄清楚控制轮询间隔的方法.默认情况下,一旦所有线程都空闲,弹出轮询.

即考虑以下示例

  1. 大约有3条消息被发送到Queue
  2. Spring轮询队列并获得3条消息
  3. 处理每条消息的3条消息大约需要20分钟

与此同时,大约有25条消息被送到队列.在完成之前传递的所有3条消息之前,Spring不会轮询队列.从上面的例子来看,只有20分钟之后春季民意调查,尽管有7个线程仍然免费!

知道我们如何控制这种民意调查吗?即如果有任何线程空闲,则应该开始轮询,并且不应该等到所有线程都空闲

Moo*_*ose 3

您的侦听器可以将消息加载到 Spring 应用程序中,并将它们与AcknowledgementVisibility对象一起提交到另一个线程池(如果您想控制两者)。

一旦消息提交到该线程池,您的侦听器就可以加载更多数据。您可以通过调整线程池设置来控制并发性。

您的侦听器的方法签名将类似于以下之一:

@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
                   @Headers Map<String, Object> headers,
                   Acknowledgment acknowledgment,
                   Visibility visibility) throws Exception {
...... Send pojo to worker thread and return
Run Code Online (Sandbox Code Playgroud)

然后工作线程将确认处理成功

acknowledgment.acknowledge().get();
Run Code Online (Sandbox Code Playgroud)

确保您的消息可见性设置为大于最高处理时间的值(使用一些超时来限制执行时间)。