防止Amazon SQS中重复邮件的最佳方法是什么?我有一个等待被抓取的域的SQS.在我向SQS添加新域之前,我可以检查保存的数据,看看它是否最近被抓取,以防止重复.
问题在于尚未抓取的域.例如,如果队列中有1000个域尚未被爬网.任何这些链接都可以再次添加,一次又一次.这使我的SQS膨胀成数十万条主要是重复的消息.
我该如何防止这种情况?有没有办法从队列中删除所有重复项?或者有没有办法在添加消息之前搜索队列中的消息?我觉得任何有SQS的人都必须经历过这个问题.
我可以看到的一个选项是,在将域添加到SQS之前,我是否存储了一些数据.但是,如果我必须将数据存储两次,那么首先会破坏使用SQS的重点.
我正在尝试利用 AWS CLI 和forbash 中的循环来迭代清除多个 SQS 消息队列。bash 脚本几乎按预期工作,我遇到的问题是每次 AWS CLI 发送请求时的返回值。当请求成功时,它返回一个空值并在命令行中打开一个交互式寻呼机。然后我必须手动输入q以退出交互式屏幕并允许for循环继续下一次迭代。当尝试清除大量队列时,这变得非常乏味和耗时。
有没有办法配置 AWS CLI 以禁用此交互式寻呼机为每个返回值弹出?或者将返回值通过管道传输到单独的文件而不是显示的方法?
我玩过配置不同的返回值类型(文本、yaml、JSON)但没有任何运气。此外,该--no-pagination参数不会改变行为。
这是我尝试运行的 bash 脚本示例:
for x in 1 2 3; do
aws sqs purge-queue --queue-url https://sqs.<aws-region>.amazonaws.com/<id>/<env>-$x-<queueName>.fifo;
done
Run Code Online (Sandbox Code Playgroud) 我正在使用Celery 2.4.6和django-celery 2.4.2.
当我根据这个问题的分辨率配置Celery使用Amazon SQS时:使用Amazon SQS的Celery
我在Django管理员的celerycam表中看不到任何内容.如果我切换回RabbitMQ,任务会再次开始显示.
我有很多(现在的40+)的SQS命名是这样的队列:"celeryev-92e068c4-9390-4c97-bc1d-13fd6e309e19",它看起来像他们可能与(一些上了年纪的人,甚至在他们的活动),但没有任何东西出现在数据库中,我看到celerycam日志中没有错误.
关于问题可能是什么或如何进一步调试的任何建议将非常感激.
我目前正在开发一个使用Symfony2开发的网站,我需要在Amazon SQS中发送消息.为了做到这一点,我加入了我的composer.json:
"aws/aws-sdk-php": "2.4.*"
Run Code Online (Sandbox Code Playgroud)
然后,当我尝试创建队列或列表队列时,我遇到403错误说:
编辑: 添加完整的错误消息
AWS错误代码:AccessDenied,状态代码:403,AWS请求ID:2fe34c11-7af8-5445-a768-070159a0953e,AWS错误类型:客户端,AWS错误消息:访问资源 https://sqs.us-west-2 .amazonaws.com /被拒绝.,User-Agent:aws-sdk-php2/2.4.11 Guzzle/3.7.4 curl/7.25.0 PHP/5.4.3
以下是我所做的示例代码:
$aws = Aws::factory(array(
'key' => 'my-key',
'secret' => 'my-secret',
'region' => 'us-west-2'
));
$sqs = $aws->get('sqs');
return new Response(var_dump($sqs->listQueues()));
Run Code Online (Sandbox Code Playgroud)
如果出现此错误,我该怎么办?
我知道,为了带来巨大的可扩展性和可靠性,SQS可以进行广泛的资源并行化.它使用冗余服务器甚至是小队列,甚至发布到队列的消息也作为多个副本冗余存储.这些是阻止它像RabbitMQ那样完全一次交付的因素.我看到甚至已删除的消息正在传递.
对开发人员的影响是他们需要为多个消息传递做好准备.亚马逊声称这不是一个问题,但事实上,开发人员必须使用一些同步构造,如数据库事务锁或dynamo-db条件写.这两者都降低了可扩展性.
鉴于重复传递问题,消息不可见时段功能如何保持?该邮件不保证不可见.如果开发者必须自己安排同步,那么隐形期间会带来什么好处.我已经看到消息重新传递,即使它们应该是隐形的.
这里我包括一些参考
我在SQS中有多条消息.以下代码始终只返回一个,即使有几十个可见(不在飞行中). setMaxNumberOfMessages我认为会允许多次一次消费..我误解了吗?
CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
String queueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
// i'm a message from SQS
}
Run Code Online (Sandbox Code Playgroud)
我也试过使用withMaxNumberOfMessages而没有任何运气:
receiveMessageRequest.withMaxNumberOfMessages(10);
Run Code Online (Sandbox Code Playgroud)
我怎么知道队列中有消息?超过1?
Set<String> attrs = new HashSet<String>();
attrs.add("ApproximateNumberOfMessages");
CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
GetQueueAttributesRequest a = new GetQueueAttributesRequest().withQueueUrl(sqs.createQueue(createQueueRequest).getQueueUrl()).withAttributeNames(attrs);
Map<String,String> result = sqs.getQueueAttributes(a).getAttributes();
int num = Integer.parseInt(result.get("ApproximateNumberOfMessages"));
Run Code Online (Sandbox Code Playgroud)
以上总是先行,并给我一个int> 1
感谢您的输入
我有一个基于服务的应用程序,它使用具有多个队列和多个消费者的Amazon SQS.我这样做是为了实现基于事件的体系结构并解耦所有服务,其中不同的服务对其他系统的状态变化做出反应.例如:
我有很多问题:
我想我的问题是:我应该使用哪些模式来确保我可以在SQS中为单个队列拥有多个使用者,同时确保消息也可以被可靠地传递和删除.谢谢您的帮助.
event-based-programming amazon-sqs amazon-web-services microservices
我正在使用Storm 0.8.1从Amazon SQS队列中读取传入消息,并在执行此操作时获得一致的异常:
2013-12-02 02:21:38 executor [ERROR]
java.lang.RuntimeException: com.amazonaws.AmazonClientException: Unable to unmarshall response (ParseError at [row,col]:[1,1]
Message: JAXP00010001: The parser has encountered more than "64000" entity expansions in this document; this is the limit imposed by the JDK.)
at REDACTED.spouts.SqsQueueSpout.handleNextTuple(SqsQueueSpout.java:219)
at REDACTED.spouts.SqsQueueSpout.nextTuple(SqsQueueSpout.java:88)
at backtype.storm.daemon.executor$fn__3976$fn__4017$fn__4018.invoke(executor.clj:447)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:701)
Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall response (ParseError at [row,col]:[1,1]
Message: JAXP00010001: The parser has encountered more than "64000" entity expansions in this document; this is the limit imposed by …Run Code Online (Sandbox Code Playgroud) 如果有人可以帮忙解释,我正试图了解我的工作流程中是否需要SQS.在我的应用程序中,当采取行动时,它会向SNS主题提交信息,该主题调用LAMBDA进行一些处理.这很好用.
当我在网上进行研究时,似乎人们在这个堆栈中使用SQS,而SNS会在SQS上放置信息,然后SQS会调用LAMBDA.
我想我想要了解的是需要SQS.添加什么值,或者换句话说,通过直接从SNS调用我的LAMBDA,我失去了什么?
在发送带有以下代码的SQS属性的消息之后(并在SQS控制台中检查是否所有内容都正确发布)...
messageRequest.addMessageAttributesEntry(
"attributeTest",
new MessageAttributeValue()
.withDataType("String")
.withStringValue("attributeTest 123"));
Run Code Online (Sandbox Code Playgroud)
我无法检索邮件中的任何邮件属性.因此,我所看到的只是"0属性".在Amazon SQS控制台中重新检查消息,消息 - 和属性 - 仍然存在.
// Message was previously checked in SQS console and contains
// an attribute named "attributeTest"
AmazonSQS sqs = ...
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message : messages)
{
Map<String, MessageAttributeValue> attributes = message.getMessageAttributes();
System.out.println("" + attributes.size() + " attributes.");
}
Run Code Online (Sandbox Code Playgroud)
我在Java 1.7中使用Amazon SQS SDK v1.8和Play Framework 2.2.3.起初我认为它可能是SQS版本,但尝试升级到1.8.7无济于事.
这里找到的官方文档根本没有提供任何源代码来读取属性.也没有github搜索,堆栈溢出.我已经尝试了几个小时而没有任何成功.
谢谢你的帮助!
amazon-sqs ×10
java ×3
amazon-sns ×1
apache-storm ×1
aws-cli ×1
aws-lambda ×1
aws-sdk ×1
bash ×1
celery ×1
django ×1
jaxp ×1
symfony ×1