我正在尝试编写一个nodejs sqs队列处理器.
"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
sqs.receiveMessage({
"QueueUrl": appConf.sqs_distribution_url,
"MaxNumberOfMessages": 1,
"VisibilityTimeout": 30,
"WaitTimeSeconds": 20
}, function (err, data) {
var sqs_message_body;
if (data.Messages) {
if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
//sqs msg body
sqs_message_body = JSON.parse(data.Messages[0].Body);
//make call to nodejs handler in codeigniter
exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
function (error, …Run Code Online (Sandbox Code Playgroud) 我有一种情况,当一个msg失败,我想使用python boto包重放那个具有最高优先级的msg,所以他将被带走.如果我没错,SQS队列不支持优先级队列,所以我想实现一些简单的东西.
重要提示:当msg失败时,我不再拥有消息对象,我只保留receipt_handle,这样我就可以删除消息(如果有超过x次重试)/更改超时可见性,以便将他推回队列.
谢谢!
我正在尝试传递然后将带有属性的消息检索到AWS SQS中.即使我可以通过管理控制台查看消息的属性,但我无法使用boto3获取它们,总是得到无.更改"AttributeNames"并没有什么不同.可以检索邮件正文.
import boto3
sqs = boto3.resource('sqs', region_name = "us-west-2")
queue = sqs.get_queue_by_name(QueueName='test')
queue.send_message(MessageBody = "LastEvaluatedKey",
MessageAttributes ={
'class_number':{
"StringValue":"Value value ",
"DataType":"String"
}
}
)
messages = queue.receive_messages(
MaxNumberOfMessages=1,
AttributeNames=['All']
)
for msg in messages:
print(msg.message_attributes) # returns None
print(msg.body) # returns correct value
Run Code Online (Sandbox Code Playgroud) 在下面的示例中,我将max和core pool size设置为1.但是没有消息正在处理中.当我启用调试日志时,我能够看到从SQS中提取的消息,但我猜它没有被处理/删除.但是,当我将核心和最大池大小增加到2时,似乎会处理消息.
编辑
我相信Spring可能为接收器分配一个线程,该接收器从队列中读取数据,因此无法将线程分配给正在处理消息的侦听器.当我将corepoolsize增加到2时,我看到消息正在从队列中读取.当我添加另一个监听器(用于死信队列)时,我遇到了同样的问题 - 由于没有处理消息,因此2个线程是不够的.当我将corepoolsize增加到3时,它开始处理消息.我假设在这种情况下,分配了1个线程来读取队列中的消息,并为2个侦听器分配了1个线程.
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e); …Run Code Online (Sandbox Code Playgroud) AWS博客称它支持扇出消息传递模式.为此,我将SQS队列订阅到SNS主题.
现在我的问题是,SNS向SQS发布消息是否可靠?如同,在每个帖子发送到SNS时,是否保证SNS将始终成功地在订阅的所有SQS队列上发布通知(消息)?
嗨,我是laravel 4的新手,在我的本地机器上配置AWS SQS时遇到麻烦.我需要在AWS队列中推送一些作业并按顺序执行它们.
我在app/config/queue.php中设置了所需的值
'sqs' => array(
'driver' => 'sqs',
'key' => 'XXXXXX',
'secret' => 'XXXXXX',
'queue' => 'https://sqs.us-west-2.amazonaws.com/XXXXXX/myqueue',
'region' => 'us-west-2',
),
Run Code Online (Sandbox Code Playgroud)
并且还覆盖了app/config/local/queue.php中的队列值
$queue = include __DIR__ . "/../queue.php";
$queue['connections']['sqs']['queue'] = 'https://sqs.us-west-2.amazonaws.com/XXXXXXX/mylocalqueue';
return $queue;
Run Code Online (Sandbox Code Playgroud)
我还更改了bootstrap/start.php的更新,将环境设置为本地环境
<?php
$env = $app->detectEnvironment(array(
'local' => array('my-machine-name'),
));
Run Code Online (Sandbox Code Playgroud)
我已将控制器功能中的作业推送到队列中,如下所示
public function pus_aws($data){
$queue = $this->app['queue'];
$queue->push('\ControllerName@ActionName', array(
'data' => $data,
));
return true;
}
Run Code Online (Sandbox Code Playgroud)
但它没有用.有人可以帮我推动和运行排队的工作吗?
从Amazon SQS FAQ页面:
Amazon SQS不保证对Amazon SQS队列中的消息进行FIFO访问,主要是因为Amazon SQS的分布式特性.如果您需要特定的消息排序,则应设计应用程序来处理它.
我的问题是 - 如何以这种方式设计应用程序?
我目前正在将AWS的Elastic Beanstalk工作者用于我的队列,可用于触发自动扩展的指标非常通用(CPU,Net in,Net out等).
我很想知道是否可以根据连接到worker的队列状态使用触发器 - 特别是根据过去X分钟队列中可用消息的平均数量添加或删除实例?
amazon-ec2 amazon-sqs amazon-web-services amazon-elastic-beanstalk
我想在AWS S3存储桶中设置以下事件/通知配置:收到文件(s3:ObjectCreated:*)后,将触发两个目标:
当我尝试通过AWS控制台设置配置时,我收到以下错误消息:
Configurations overlap. Configurations on the same bucket cannot share a common event type. : s3:ObjectCreated:*, s3:ObjectCreated:*
Run Code Online (Sandbox Code Playgroud)
我尝试按照用户指南的建议通过AWS SDK(Java)设置配置,但结果类似:
Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
Error Message: Configurations overlap. Configurations on the same bucket cannot share a common event type. (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument; Request ID: A0E8738522EA218F)
HTTP Status Code: 400
AWS Error Code: InvalidArgument …Run Code Online (Sandbox Code Playgroud) 我有一个AWS SQS队列,我将设置一个Lambda函数触发器,为每个添加到队列的项目运行Lambda函数以进行一些处理工作.
处理的一个步骤是访问API端点,以便为添加到队列的每个项目返回一些数据,然后将其存储在DynamoDB表中.为了管理成本并控制这个过程,我想要限制在一段时间内调用这个Lambda函数的次数.
例如,我希望此函数每天最多运行100次,以免压倒DynamoDB表容量或API端点.将它限制在一起也是很好的,它将同时运行最多5个并发动作,再次运行之间有1秒的延迟.这种类型的控制允许我直接将函数映射到DynamoDB表限制,以确保我不会超过容量,并且我遵守任何API速率限制.
我查看了AWS Lambda管理并发.特别是"功能级别并发执行限制"部分.但是这部分似乎没有解决每天100次限制,或者运行下一个功能之间的1秒延迟.
我也知道我可以通过限制我一次(或每天)放入队列的项目数来限制它.但是由于我正在计划这个系统,这会增加很多我希望避免的复杂性.
有没有办法使用AWS SQS和Lambda来实现这一点并限制这些Lambda函数?
amazon-sqs ×10
aws-lambda ×2
amazon-ec2 ×1
amazon-s3 ×1
amazon-sns ×1
boto ×1
boto3 ×1
fifo ×1
java ×1
laravel ×1
node.js ×1
python ×1
queue ×1
spring ×1
spring-cloud ×1