亚马逊sqs的异步消费者

use*_*993 4 java asynchronous amazon-sqs jmstemplate

我不熟悉队列.我能够成功发布消息并同步接收它们但是,我现在正试图异步.

sqs提供的参考链接建议使用jmsclient包装器.如果您已经有一个集成到jms客户端的代码,那么该链接还会提到使用它.

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/jmsclient.html#samples

但我重新开始,我引用这个例子同步发送和recv消息.

https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonSimpleQueueService/SimpleQueueServiceSample.java

我可以使用相同的代码,但使用消息监听器实现它吗?任何代码示例将不胜感激.

rba*_*rni 6

Amazon SQS开发人员指南中有关使用JMS与Amazon SQS的部分中有一个代码示例,该部分示例说明了如何使用JMS异步接收消息.

首先,实现MessageListener接口:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后将其设置为MessageConsumer的MessageListener:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

// Wait for 1 second. The listener onMessage() method will be invoked when a message is received.
Thread.sleep(1000);
Run Code Online (Sandbox Code Playgroud)

  • 我从哪里获取对象`queue` 以传递给`.createConsumer` 方法?我使用现有队列,不想创建新队列。 (2认同)

doe*_*uvc 6

您可以使用sqslistenerSpringCloud 框架中的注释。如果您正在使用 和 开发应用程序并且SpringAWS没有使用Spring Cloud,那么现在是您切换的好时机。

以下是使用注释从 SQS 异步接收消息的示例代码sqslistener。一件好事是你必须几乎零配置才能使用它:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import com.example.my.RecoverableException;

@Component
@Slf4j
public class CustomMessageQueue {

    @SqsListener(value = "${build_request_queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void receive(String message) {
        try {
            // write message processing here
        } catch (RecoverableException e) {
            // handle errors here for which message from queue should not be deleted
            // throwing an exception will make receive method fail and hence message does not get deleted
            throw e;

        } catch (Exception e) {
            // suppress exceptions for which message should be deleted.
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

注释的伟大之处sqslistener在于它的deletionPolicy。因此您可以决定何时删除来自 SQS 的消息。


Ole*_*lin -1

SQS 代表“简单队列服务”。从字面上看,它的意思很简单。因此,它不支持 JMS 的一些细节,特别是异步侦听器。

我写了一篇关于此主题的博客文章:http://thedulinreport.com/2015/05/09/guaranteeing-delivery-of-messages-with-aws-sqs/

基本上,您需要做的是在无限循环中编写一个轮询器,但您希望对此保持聪明 - 您不想继续进行太多轮询,因为您需要按请求付费。