Yal*_*ber 10 amazon-sqs node.js
我正在尝试编写一个nodejs sqs队列处理器.
"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
sqs.receiveMessage({
"QueueUrl": appConf.sqs_distribution_url,
"MaxNumberOfMessages": 1,
"VisibilityTimeout": 30,
"WaitTimeSeconds": 20
}, function (err, data) {
var sqs_message_body;
if (data.Messages) {
if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
//sqs msg body
sqs_message_body = JSON.parse(data.Messages[0].Body);
//make call to nodejs handler in codeigniter
exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
function (error, stdout, stderr) {
if (error) {
throw error;
}
console.log('stdout: ' + stdout);
if(stdout == 'Success'){
//delete message from queue
sqs.deleteMessage({
"QueueUrl" : appConf.sqs_distribution_url,
"ReceiptHandle" :data.Messages[0].ReceiptHandle
});
}
});
}
}
});
}
readMessage();
Run Code Online (Sandbox Code Playgroud)
上面的代码适用于队列中的单个消息.我应该如何编写此脚本以便它继续轮询队列中的消息,直到所有消息都被处理?我应该使用设置超时吗?
zav*_*avg 15
首先,您应该定义使用亚马逊提供的长轮询技术,据我所知,您已经在使用它,因为您"WaitTimeSeconds": 20在sqs.receiveMessage调用中有参数.我希望您不要忘记在AWS Web界面中配置它.
关于轮询消息 - 你可以使用不同的技术,包括计时器,但我认为最简单的只是readMessage()在你receiveMessage的(甚至exec是)的回调函数结束时调用你的函数.因此,队列中前一个消息的处理结束后,将立即开始处理(或等待)队列中的下一个消息.
更新:
至于我在你的新版本的代码中有很多readMessage()电话.我认为最好将其最小化以使代码更清晰,更易于维护.但是,如果您离开,例如,在主receiveMessage回调结束时只有一个调用,您将收到许多并行运行的PHP工作程序脚本 - 从性能的角度来看,它可能并不是那么糟糕 - 但是你会必须添加一些复杂的脚本来控制并行工作量.我认为你可以在exec回调中削减一些调用,尝试加入ifs并在主回调中加入调用.
"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var delay = 20 * 1000;
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
sqs.receiveMessage({
"QueueUrl": appConf.sqs_distribution_url,
"MaxNumberOfMessages": 1,
"VisibilityTimeout": 30,
"WaitTimeSeconds": 20
}, function (err, data) {
var sqs_message_body;
if (data.Messages)
&& (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) {
//sqs msg body
sqs_message_body = JSON.parse(data.Messages[0].Body);
//make call to nodejs handler in codeigniter
exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
function (error, stdout, stderr) {
if (error) {
// error handling
}
if(stdout == 'Success'){
//delete message from queue
sqs.deleteMessage({
"QueueUrl" : appConf.sqs_distribution_url,
"ReceiptHandle" :data.Messages[0].ReceiptHandle
}, function(err, data){
});
}
readMessage();
});
}
}
readMessage();
});
}
readMessage();
Run Code Online (Sandbox Code Playgroud)
关于内存泄漏:我认为你不应该担心,因为下一次调用readMessage()发生在回调函数中 - 所以不是递归的,并且递归调用函数在调用receiveMessage()函数后立即将值返回到父函数.
| 归档时间: |
|
| 查看次数: |
12338 次 |
| 最近记录: |