lk7*_*777 3 java amazon-sqs amazon-web-services
我一直在尝试使用AWS SDK for Java从SQS队列中检索所有消息的几种方法无济于事.我已经了解了AWS SQS的分布式特性,并且消息存储在不同的服务器上.但我不明白为什么这个架构不会被最终用户隐藏.我需要在Java代码中应用哪些技巧来检索所有消息,并且100%确定没有人错过?
我试着用"长轮询":
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
Run Code Online (Sandbox Code Playgroud)
这与请求批处理/客户端缓冲:
// Create the basic Amazon SQS async client
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
// Create the buffered client
AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");
CreateQueueResult res = bufferedSqs.createQueue(createRequest);
SendMessageRequest request = new SendMessageRequest();
String body = "test message_" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
SendMessageResult sendResult = bufferedSqs.sendMessage(request);
ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
.withMaxNumberOfMessages(10)
.withQueueUrl(queueUrl);
ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
List<Message> messages = rx.getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
Run Code Online (Sandbox Code Playgroud)
但我仍然无法检索所有消息.
任何的想法?
AWS论坛对我的帖子保持沉默.
从SQS队列接收消息时,需要重复呼叫sqs:ReceiveMessage.
在每次调用时sqs:ReceiveMessage,您将从队列中获得0个或更多消息,您需要迭代这些消息.对于每条消息,您还需要sqs:DeleteMessage在处理完每条消息后调用以从队列中删除消息.
在上面的"长轮询"示例周围添加一个循环以接收所有消息.
for (;;) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
}
Run Code Online (Sandbox Code Playgroud)
另请注意,您可能会多次收到相同的消息.因此,允许您的工作"重新处理"相同的消息,或检测重复的消息.
小智 6
我也面临同样的问题 - 只有一条消息被返回,然后我尝试了 receiveMessageRequest.setMaxNumberOfMessages(10) ,这将帮助我在循环中检索 10 条消息,
因为我的队列有超过 500 条记录,所以我所做的是
List<String> messagelist = new ArrayList<>();
try
{
AmazonSQS sqs = new AmazonSQSClient(credentials);
Region usWest2 = Region.getRegion(Regions.US_WEST_2);
sqs.setRegion(usWest2);
boolean flag = true;
while(flag)
{
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages)
{
// System.out.println(" Body: " + message.getBody());
messagelist.add( message.getBody());
String messageReceiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
}
if(messages.size()==0)
{
flag = false;
}
}
}
catch (AmazonServiceException ase) {
ase.printStackTrace();
} catch (AmazonClientException ace) {
ace.printStackTrace();
}
finally {
return messagelist ;
}
Run Code Online (Sandbox Code Playgroud)
我正在从 SQS 读取记录,然后将其保存到一个字符串列表中,然后从队列中删除该记录。
所以最后我会把队列中的所有数据放在一个列表中