Hba*_*jar 7 java amazon-sqs spring-boot spring-cloud
我有一个AWS SQS,其中已有5000条消息在队列中(示例消息看起来像'Hello @ 1')我创建了一个SpringBoot应用程序,并在其中一个组件类中创建了一个从SQS读取消息的方法.
package com.example.aws.sqs.service;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class MessageReceiverService {
@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
log.info("Reading Message... {}", message);
}
Run Code Online (Sandbox Code Playgroud)
}
我的主要SpringBoot类
@SpringBootApplication
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
我运行应用程序时遇到的异常:
s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted
Run Code Online (Sandbox Code Playgroud)
我没有配置任何自定义Executor服务.使用预配置的Spring Beans.springBootVersion ='2.0.3.RELEASE'springCloudVersion ='Finchley.RELEASE'
设置最大消息数似乎可以解决问题:
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQS);
factory.setMaxNumberOfMessages(10);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
小智 5
我认为这是 Spring 中的一个错误或疏忽。该问题源于以下默认值:
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
private static final int DEFAULT_WORKER_THREADS = 2;
Run Code Online (Sandbox Code Playgroud)
和
abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;
Run Code Online (Sandbox Code Playgroud)
如果未设置 maxNumberOfMessages,则它将使用 10 作为从 SQS 拉取的消息数,并使用 2 作为任务执行器中的工作线程数。这意味着如果它一次拉取 3 条或更多消息,您就会遇到该异常。如果您手动将 maxNumberOfMessages 设置为一个值(任何值),它将在两个地方使用它来同步这些值,正如我相信的那样:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
{
SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
container.setMaxNumberOfMessages(5);
container.setMessageHandler(messageHandler);
return container;
}
Run Code Online (Sandbox Code Playgroud)
问题在于侦听器线程配置。请参阅以下内容
...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...
Run Code Online (Sandbox Code Playgroud)
默认线程池大小小于您想要的大小。
将以下配置添加到您的 Spring 应用程序中
@Configuration
public class TasksConfiguration implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(5); // TODO: Load this from configuration
taskScheduler.initialize();
taskRegistrar.setTaskScheduler(taskScheduler);
}
}
Run Code Online (Sandbox Code Playgroud)
现在,您应该能够处理这些任务。
PS 无论之前被拒绝的任务,它们都会在一定期限后被接收。
编辑:我认为人们被排队的人数吓到了.setPoolSize(5000)。这是一个可配置的数字,您可以选择适合您要求的任何数字。对于答案,我将其减少到一个较小的数字。
| 归档时间: |
|
| 查看次数: |
4177 次 |
| 最近记录: |