在RabbitMQ中,可以创建一个交换,然后将其绑定到多个队列,每个队列都有一个路由密钥.这使得消息传递体系结构如下所示:
message_x
/ | \
foo-msg_q bar-msg_q msg-logger_q
Run Code Online (Sandbox Code Playgroud)
客户端将消息发布到message_x交换机,该消息仅将具有路由密钥"foo"的消息路由到foo-msg_q队列,仅将具有路由密钥"bar"的消息路由到bar-msg_q队列,并将所有消息路由到msg-logger_q队列.
我无法确定如何在AWS中执行此操作.我的第一个想法是在各个队列上设置权限以接受基于主题的消息,但是权限条件的唯一可用字段是:
这些似乎都不会受到我发布到该message_x主题的任何消息的影响.
当使用Amazon Simple Notification Service扇出到多个简单队列服务队列时,是否可以做这样的事情,每个队列接收发布到该主题的消息子集?
Queue类上是否有一个方法可以指定队列配置中定义的特定连接?MySql有一个类似的选项,你可以在其中定义'mysql2',然后调用:
DB::connection('mysql2')->table('etc')->get();
Run Code Online (Sandbox Code Playgroud)
队列有类似的选择吗?
就像是:
Queue::connection('sqs2')->push('MyQueue', array('message' => $message));
Run Code Online (Sandbox Code Playgroud) 我有一台运行在rackspace上的服务器,它托管一个PHP Web应用程序.
PHP Web应用程序将接受表单提交,然后需要根据表单字段条目执行任务.
任务(让我们称之为生成元数据任务)需要相当多的处理时间.我想知道如何允许表单提交直接保存到数据库,并在后台运行生成元数据任务时立即向用户显示成功页面.
我已将"aws/aws-sdk-php": "~3.11"使用composer 安装到同一个Web应用程序中.
我的计划最初是这样的:
处理表单提交的代码
$result = $model->save($_POST);
// this code will send the information to either SQS or SNS
$awsClient->sendsMessage($_POST);
if ($result) {
$this->redirect('success.html');
}
Run Code Online (Sandbox Code Playgroud)
我已经阅读了AWS 所述的扇出架构.
我对扇出架构示例的问题(据我所知)是这样的:
我在这里找到了可能的解决方案
建议的解决方案是:
将消息发送到SNS主题.
SNS主题将同时发送SQS队列和我的Web应用程序.
我的Web应用程序在被触发后将轮询同一个SQS队列,该队列现在已连续排队消息,直到队列为空
我从中看到的缺点是我的Web应用程序将在队列本身有消息之前轮询队列.
使用AWS服务实现推送队列的最佳方法是什么?
我有一个AWS Lambda函数,它订阅DynamoDB流并配置了SQS死信队列(DLQ).我可以看到在管理控制台中配置了正确的队列.我也小心翼翼地sqs:SendMessage在我的DLQ上赋予我的功能权限.
订阅有效,但仍然"挂起"调用错误,就像没有配置DLQ一样.即,如果有消息导致未处理的异常,则该函数继续重试此消息,直到它从流中删除.我可以看到调用错误的数量增加,并且函数的Cloudwatch仪表板中没有显示DLQ错误.SQS队列保持为空.
我想要的是失败的消息转发到我的DLQ并且订阅继续到下一条消息.有任何想法吗?
正如Jonathan Seed在下面所说,DLQ目前不适用于基于流的订阅.AWS Support确认他们正在努力实现这一点.
在我详细解释问题之前,我告诉你我目前的做法.
我有一个运行setInterval()的js脚本.每个间隔,我都会调用SQS从队列中获取消息.如果有消息,那我就处理它.
所以,它将无限运行,直到我杀死进程.
我之前也构建了一个节点服务器(使用nodejs.org中的示例)
所以,我想知道的是,...而不是定期运行setInterval.有没有办法,如果SQS中有新消息,那么它会触发事件并处理消息?
我正在尝试编写一个nodejs sqs队列处理器.
"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
sqs.receiveMessage({
"QueueUrl": appConf.sqs_distribution_url,
"MaxNumberOfMessages": 1,
"VisibilityTimeout": 30,
"WaitTimeSeconds": 20
}, function (err, data) {
var sqs_message_body;
if (data.Messages) {
if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
//sqs msg body
sqs_message_body = JSON.parse(data.Messages[0].Body);
//make call to nodejs handler in codeigniter
exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
function (error, …Run Code Online (Sandbox Code Playgroud) 我们正在使用具有多个SQS队列订户的SNS主题实施AWS扇出模式.我想知道如果我成功地在SNS主题上发布消息会发生什么,但是由于某种原因它无法将其转发到队列中.SNS会重试,如果有的话,有没有办法控制它.
我找到了这个页面http://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html,它讨论了为SNS HTTP/HTTPS端点配置重试策略,但SQS上没有任何内容.
我有一个AWS SQS队列,其权限策略如下所示:
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-east-1:123123123:default_staging/SQSDefaultPolicy",
"Statement": [
{
"Sid": "Sid123123123123",
"Effect": "Allow",
"Principal": {
"AWS": "123123123123"
},
"Action": "SQS:*",
"Resource": "arn:aws:sqs:us-east-1:123123123123123:default_staging"
}
]
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,我无法default_staging从我在不同地区的AWS服务器上向队列添加消息.
如果我将权限策略设置为全开,我可以default_staging从其他区域添加消息.
如何调整我的政策以允许SQS:*我所有地区的行动?
我有一个SQS Queue消息由多个主机读取.我希望在处理完队列中的所有消息后运行一些工作(业务逻辑).
如何检查队列是否为空?
是的,我可以检查ApproximateNumberOfMessages和ApproximateNumberOfMessagesNotVisible排队属性,但这些是近似数字.我想在没有消息时停止我的主机轮询队列中的消息,然后运行所需的作业.
有任何想法吗?谢谢
尝试使用SQS消息时,看到以下异常:
org.springframework.messaging.converter.MessageConversionException:
Cannot convert from [java.lang.String] to [com.example.demo.Foo] for GenericMessage [payload={}, headers={LogicalResourceId=my-queue, ApproximateReceiveCount=1, SentTimestamp=1529021258825, ReceiptHandle=xxxx, Visibility=org.springframework.cloud.aws.messaging.listener.QueueMessageVisibility@47ce6922, SenderId=xxxx, lookupDestination=my-queue, ApproximateFirstReceiveTimestamp=1529021264456, MessageId=xxxx}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:515)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:473)
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:409)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:205)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:342)
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:397)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
Spring Boot的代码如下:
@Configuration
@EnableSqs
public class AmazonSqsConfiguration {
@Bean
public AmazonSQS amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(Regions.US_WEST_2)
.build();
}
}
@Service
public class MyService {
// Throws MessageConversionException
@SqsListener("my-queue")
public void listen(Foo payload) {
} …Run Code Online (Sandbox Code Playgroud) amazon-sqs ×10
amazon-sns ×3
node.js ×2
aws-lambda ×1
events ×1
laravel ×1
laravel-4 ×1
php ×1
spring-cloud ×1