DefaultMessageListenerContainer 停止处理消息

Jim*_* M. 2 spring amazon-sqs amazon-web-services spring-jms spring-cloud

我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。

设置

  • Spring-Boor 2.2.2.RELEASE
  • 云启动器
  • 云启动器 aws
  • 弹簧JMS
  • spring-cloud-依赖项 Hoxton.SR1
  • 亚马逊-sqs-java-messaging-lib 1.0.8

问题

我的应用程序启动正常并开始处理来自 Amazon SQS 的消息。一段时间后,我看到以下警告

2020-02-01 04:16:21.482 LogLevel=WARN 1 --- [ecutor-thread14] osjlDefaultMessageListenerContainer:计划的消费者数量已降至并发消费者限制以下,可能是由于任务被拒绝。检查你的线程池配置!由剩余消费者触发自动恢复。

上述警告被打印多次,最终我看到以下两条信息消息

2020-02-01 04:17:51.552 LogLevel=INFO 1 --- [ecutor-thread40] casjavamessaging.SQSMessageConsumer :关闭 ConsumerPrefetch 执行器

2020-02-01 04:18:06.640 LogLevel=INFO 1 --- [ecutor-thread40] com.amazon.sqs.javamessaging.SQSSession :关闭 SessionCallBackScheduler 执行程序

上述 2 条消息将显示多次,并且在某个时刻,SQS 不再消耗任何消息。我在日志中没有看到任何其他消息表明存在问题,但我没有从处理程序收到任何消息表明它们正在处理消息(我有 2 个~),并且我可以看到 AWS SQS 队列的消息数量不断增长,并且年龄。

~:当我有一个处理程序时,这个确切的代码工作正常,当我添加第二个处理程序时,这个问题就开始了。

配置/代码

我意识到的第一个“警告”是由ThreadPoolTask​​Executor的货币引起的,但我无法获得正常工作的配置。这是我当前的 JMS 配置,我尝试了各种级别的最大池大小,除了根据池大小迟早启动警告之外,没有任何实际影响

    public ThreadPoolTaskExecutor asyncAppConsumerTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadGroupName("asyncConsumerTaskExecutor");
        taskExecutor.setThreadNamePrefix("asyncConsumerTaskExecutor-thread");
        taskExecutor.setCorePoolSize(10);
        // Allow the thread pool to grow up to 4 times the core size, evidently not
        // having the pool be larger than the max concurrency causes the JMS queue
        // to barf on itself with messages like
        // "Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers"
        taskExecutor.setMaxPoolSize(10 * 4);
        taskExecutor.setQueueCapacity(0); // do not queue up messages
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        return taskExecutor;
    }
Run Code Online (Sandbox Code Playgroud)

这是我们创建的JMS容器工厂

    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SQSConnectionFactory sqsConnectionFactory, ThreadPoolTaskExecutor asyncConsumerTaskExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(sqsConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        // The JMS processor will start 'concurrency' number of tasks
        // and supposedly will increase this to the max of '10 * 3'
        factory.setConcurrency(10 + "-" + (10 * 3));
        factory.setTaskExecutor(asyncConsumerTaskExecutor);
        // Let the task process 100 messages, default appears to be 10
        factory.setMaxMessagesPerTask(100);
        // Wait up to 5 seconds for a timeout, this keeps the task around a bit longer
        factory.setReceiveTimeout(5000L);
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

我根据在互联网上找到的内容添加了setMaxMessagesPerTasksetReceiveTimeout调用,没有这些以及在各种设置(50、2500L、25、1000L 等...)下问题仍然存在。

我们创建一个默认的SQS连接工厂

    public SQSConnectionFactory sqsConnectionFactory(AmazonSQS amazonSQS) {
        return new SQSConnectionFactory(new ProviderConfiguration(), amazonSQS);
    }
Run Code Online (Sandbox Code Playgroud)

最后处理程序看起来像这样

    @JmsListener(destination = "consumer-event-queue")
    public void receiveEvents(String message) throws IOException {
        MyEventDTO myEventDTO = jsonObj.readValue(message, MyEventDTO.class);
        //messageTask.process(myEventDTO);
    }

    @JmsListener(destination = "myalert-sqs")
    public void receiveAlerts(String message) throws IOException, InterruptedException {
        final MyAlertDTO myAlert = jsonObj.readValue(message, MyAlertDTO.class);
        myProcessor.addAlertToQueue(myAlert);
    }
Run Code Online (Sandbox Code Playgroud)

您可以看到,在第一个函数(receiveEvents)中,我们只是从队列中取出消息并退出,我们还没有为此实现处理代码。第二个函数 ( receiveAlerts ) 获取消息,myProcessor.addAlertToQueue函数创建一个可运行对象并将其提交到线程池以在将来的某个时刻进行处理。

当我们添加receiveAlerts函数时,问题才开始(警告、信息和消费消息失败),之前只有另一个函数存在,我们没有看到这种行为。

更多的

这是一个较大项目的一部分,我正在努力将此代码分解为一个较小的测试用例,看看是否可以重复此问题。我将发布后续结果。

同时

我希望这只是一个配置问题,更熟悉此问题的人可以告诉我我做错了什么,或者有人可以提供一些关于如何纠正此问题以使其正常工作的想法和评论。

谢谢你!

Jim*_* M. 5

经过一番斗争后,我想我终于解决了这个问题。

该问题似乎是由于“DefaultJmsListenerContainerFactory”引起的,该工厂使用“@JmsListener”注释为每个方法创建了一个新的“DefaultJmsListenerContainer”。最初编写代码的人认为它只为应用程序调用一次,创建的容器将被重复使用。所以问题有两个方面

  1. 附加到工厂的“ThreadPoolTask​​Executor”有 40 个线程,当应用程序有 1 个“@JmsListener”方法时,效果很好,但是当我们添加第二个方法时,每个方法都有 10 个线程(总共 20 个)用于监听。不过,这很好;由于我们声明每个侦听器最多可以增长 30 个侦听器,因此我们很快就耗尽了上面 1 中提到的池中的线程。这导致了“预定消费者数量已降至并发消费者限制以下”错误
  2. 考虑到上述情况,这可能是显而易见的,但我想明确指出这一点。然而,在监听器工厂中,我们将并发设置为“10-30”;所有听众都必须共享该池。因此,必须设置最大并发数,以便每个侦听器的最大值足够小,这样如果每个侦听器创建其最大值,它就不会超过池中的最大线程数(例如,如果我们有 2 个“@JmsListener” ' 带注释的方法和具有 40 个线程的池,则最大值不能超过 20)。

希望这可以帮助将来遇到类似问题的其他人......