并行处理量简单队列服务(SQS)

Mur*_*lli 2 java spring amazon-sqs spring-cloud

我正在使用 Spring Cloud 来使用简单队列服务(SQS)。我有以下并行处理配置:

@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setConcurrencyLimit(50);
    return simpleAsyncTaskExecutor;
}

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor) {

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAutoStartup(true);
    factory.setTaskExecutor(simpleAsyncTaskExecutor);
    factory.setWaitTimeOut(20);
    factory.setMaxNumberOfMessages(10);

    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我需要在 50 个线程中处理 50 条消息(在 bean SimpleAsyncTaskExecutor 中配置),但仅并行处理 10 条消息(从 SQS 返回的 maxNumberOfMessages)

如何处理 50 条消息而不是 10 条?

Mur*_*lli 5

我找到了解决方案。

需要在方法上标注@Async,更改deletionPolicyNEVER,并在执行结束时删除消息。

这样,队列消耗将遵循配置的线程数。例如,如果您有 50 个线程,则会在 SQS 队列中发出 5 个请求(每个请求 10 条消息),从而总共并行处理 50 条消息。

代码如下所示:

@Async
@SqsListener(value = "sqsName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void consume(String message, Acknowledgment acknowledgment) throws InterruptedException, ExecutionException {

    //your code

    acknowledgment.acknowledge().get(); //To delete message from queue
}
Run Code Online (Sandbox Code Playgroud)