我不明白何时使用SNS与SQS,为什么它们总是耦合在一起?
我有一个用例,会有数据流来,我不能以相同的速度消耗它,需要一个缓冲区.这可以使用SNS-SQS队列来解决.我开始知道Kinesis解决了同样的目的,那有什么区别?为什么我更喜欢(或不应该更喜欢)Kinesis?
我需要创建一个队列进行处理.队列本身的体积相对较小.每小时可能有大约1,000次写入.每个任务的执行可能大约需要一分钟,并且几乎在项目添加到队列后立即处理.
我有没有理由想要实现RabbitMQ,而不是像Amazon SQS那样现成的东西?应用程序需要自己的排队系统而不是SQS之类的原因是什么?
这是我正在尝试的简化方案:
http请求 - >(网关API + lambda A) - > SQS - >(lambda B ?????) - > DynamoDB
所以它应该如图所示:来自许多http请求的数据(例如,每秒高达500)被我的lambda函数A放入SQS队列.然后另一个函数B处理队列:最多读取10个项目(在某些期刊的基础上)并使用BatchWriteItem将它们写入DynamoDB.
问题是我无法弄清楚如何触发第二个lambda函数.它应该被频繁调用,每秒多次(或至少每秒一次),因为我需要队列中的所有数据进入DynamoDB ASAP(这就是为什么通过这里描述的预定事件调用lambda函数B 不是一个选项)
为什么我不想在没有SQS的情况下直接写入DynamoDB?
那对我来说完全避免使用SQS会很棒.我试图用SQS解决的问题是DynamoDB限制.甚至没有限制自己,而是在使用AWS SDK将数据写入DynamoDB时处理它的方式:当逐个编写记录并限制它们时,AWS SDK会静默重试写入,导致从http客户端的点开始增加请求处理时间视图.
所以我想暂时将数据存储在队列中,将响应"200 OK"发送回客户端,然后通过单独的函数处理队列,使用一个DynamoDB的BatchWriteItem调用写入多个记录(返回未处理的项目而不是自动重试)限制).我甚至宁愿丢失一些记录,而不是增加接收和存储在DynamoDB中的记录之间的延迟
UPD:如果有人感兴趣,我已经找到了如何在节流的情况下使aws-sdk跳过自动重试:有一个特殊参数maxRetries.无论如何,将按照以下建议使用Kinesis
将消息从死信队列移回Amazon SQS中的原始队列的最佳做法是什么?
可不可能是
或者有更简单的方法吗?
此外,AWS最终会在控制台中使用一个工具来移除DLQ上的消息吗?
我对使用Amazon SQS开发一些开发很感兴趣,也许是SimpleDB,我的问题是,是否存在任何模仿功能的开源解决方案,仅用于开发目的.我已经遇到了用于创建EC-esque云的Eucalyptus项目(http://open.eucalyptus.com).
谷歌我没有取得任何成功,我怀疑这是因为入门费用如此便宜,但是,有没有人知道这样的事情?
我想使用Amazon SQS作为Celery支持的经纪人.有关于Kombu的SQS传输实现,Celery依赖它.但是没有足够的文档来使用它,所以我找不到如何在Celery上配置SQS.是否有人成功在Celery上配置SQS?
对不起,如果是新手问题.但我试图了解我应该使用什么.据我所知,卡夫卡是:
Apache Kafka是一个分布式发布 - 订阅消息传递系统.
而SNS也是pub/sub系统.
我的目标是在AWS上使用一些队列消息传递系统,该应用程序将分布在少数服务器上.顺便说一句(主要语言是Python).因为它是在亚马逊上,我的第一个想法是使用SNS和SQS.但是我看到很多人在AWS上使用Kafka.一个又一个有什么优势?
我以非常简单的方式使用Amazon SQS队列.通常,消息被写入并立即可见和读取.有时,会写入一条消息,并在队列中保持几分钟的飞行中(不可见).我可以从控制台看到它.接收消息等待时间为0,默认可见性为5秒.它将保持这种状态几分钟,或者直到新消息被写入以某种方式释放它.几秒钟的延迟是可以的,但超过60秒是不行的.
有8个读取器线程总是很长的轮询,所以它不是没有尝试读取它的东西,它们是.
编辑:要清楚,消费者读取都没有返回任何消息,无论控制台是否打开,它都会发生.在这种情况下,只涉及一条消息,它只是位于消费者看不到的队列中.
有没有其他人看到这种行为,我可以做些什么来改善它?
这是我正在使用的java的sdk:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.5.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这是执行读取的代码(max = 10,maxwait = 0启动配置):
void read(MessageConsumer consumer) {
List<Message> messages = read(max, maxWait);
for (Message message : messages) {
if (tryConsume(consumer, message)) {
delete(message.getReceiptHandle());
}
}
}
private List<Message> read(int max, int maxWait) {
AmazonSQS sqs = getClient();
ReceiveMessageRequest rq = new ReceiveMessageRequest(queueUrl);
rq.setMaxNumberOfMessages(max);
rq.setWaitTimeSeconds(maxWait);
List<Message> messages = sqs.receiveMessage(rq).getMessages();
if (messages.size() > 0) {
LOG.info("read {} messages from SQS queue",messages.size());
}
return messages;
}
Run Code Online (Sandbox Code Playgroud)
当发生这种情况时,"read .."的日志行永远不会出现,它是什么原因导致我进入控制台并查看消息是否存在,它是.
Kafka 和 SQS 一样吗?我发现两者都是消息队列系统并且是基于事件的。它们的目的相同吗?如果不是,它们有何不同?