使用AmazonSQSClient消耗缓慢的消息

BOT*_*Jr. 6 java amazon-sqs spring-jms spring-boot

因此,我在春季jms 50-100使用并发,允许最多连接高达200.一切都按预期工作但如果我尝试从队列中检索100k消息,我的意思是我的sqs上有100k消息,我通过弹簧读取它们jms正常的方法.

@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
 }
Run Code Online (Sandbox Code Playgroud)

我在控制台中看到了所有日志,但是在大约17k之后,它开始抛出异常

像:aws sdk异常:端口已经在使用中.

为什么我会看到这个例外,怎么做.我摆脱它?

我试着在互联网上寻找它.找不到任何东西.

我的设定:

并发50-100

为每个任务设置消息:50

客户承认

timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
Run Code Online (Sandbox Code Playgroud)

更新:我找到了问题,似乎正在创建新的套接字,直到每个套接字都耗尽.

我的春季jms版本是4.3.10

要复制此问题,只需执行以上配置,最大连接为200,货币设置为50-100,并将大约40k消息推送到sqs队列.可以使用https://github.com/adamw/elasticmq作为本地堆栈服务器复制亚马逊sqs ..完成后直到这里.注释jms监听器并使用soap ui加载测试并调用send消息来触发许多消息.仅仅因为你评论了@jmslistener注释,它就不会消耗来自队列的消息.一旦您看到已发送40k消息,请停止.取消注释@jmslistener并重新启动服务器.

更新:

DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setErrorHandler(Throwable::printStackTrace);
        factory.setConcurrency("50-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
Run Code Online (Sandbox Code Playgroud)

更新:

SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
Run Code Online (Sandbox Code Playgroud)

更新:

客户配置详情:

Protocol : HTTP
Max connections : 200
Run Code Online (Sandbox Code Playgroud)

更新:

我似乎使用了缓存连接工厂类.我读了堆栈溢出和他们的官方文档,不使用缓存连接工厂类和默认的jms监听器容器工厂.

/sf/answers/1539292681/

它给出了我之前得到的相同错误.

更新

我的目标是获得500 tps,即我应该能够消耗那么多..所以我尝试了这种方法,似乎我可以达到100-200,但不超过那个..加上这个东西是高阻挡并发..如果你使用它..如果你有更好的解决方案来实现它..我都是耳朵.

**更新**

我正在使用amazonsqsclient

Dov*_*vmo 5

消费者饥饿

JMS客户端倾向于实现的一种可能的优化是消息消耗缓冲区或“预取”。有时可以通过消息数量或字节大小来调整此缓冲区。

目的是防止使用者在每次收到消息时都去服务器,而不是批量提取多个消息。

在您有许多“快速使用者”(这些库可能采取的观点)的环境中,此预取设置为较高的默认值,以最大程度地减少往返次数。

但是,在消息使用者较慢的环境中,这种预取可能是一个问题。较慢的使用者正在阻止较快的使用者从预取的消息中消耗消息。在高度并发的环境中,这可能会迅速导致饥饿。

既然如此,它SQSConnectionFactory具有以下属性

SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);
Run Code Online (Sandbox Code Playgroud)

生产者饥饿(例如,通过JmsTemplate

这些JMS实现通常期望通过某些中介与代理接口。这些中介实际上缓存和重用连接,或者使用池化机制重用它们。在Java EE世界中,通常需要在Java EE服务器上使用JCA适配器或其他方法。

由于Spring JMS的工作方式,它期望ConnectionFactory存在一个中间代理来执行此缓存/池化。否则,当Spring JMS要连接到代理时,每次您想对代理进行操作时,它将尝试打开新的连接和会话(!)

为了解决这个问题,Spring提供了一些选择。最简单的是CachingConnectionFactory,它缓存一个Connection,并允许Session在其上打开多个Connection。将其添加到@Configuration上面的简单方法是:

@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {

    SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);

    // Doing the following is key!
    CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
    connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
    // Set the #connectionfactory properties to your liking here...

    return connectionFactory;

}
Run Code Online (Sandbox Code Playgroud)

如果你想要的东西更看中的JMS池解决方案(将池ConnectionsMessageProducer除了多给你的SessionS),你可以使用合理的新PooledJMS项目JmsPoolConnectionFactory,等等,从他们的图书馆。


归档时间:

查看次数:

1063 次

最近记录:

6 年,10 月 前