带有 aws-sdk receiveMessage Stall 的 Amazon SQS

Bar*_*yon 6 amazon-sqs amazon-web-services node.js

我将 aws-sdk 节点模块与(据我所知)批准的消息轮询方式一起使用。

这基本上总结为:

        sqs.receiveMessage({
            QueueUrl: queueUrl,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds: 20
        }, function(err, data) {
            if (err) {
                logger.fatal('Error on Message Recieve');
                logger.fatal(err);
            } else {
                // all good
                if (undefined === data.Messages) {
                    logger.info('No Messages Object');
                } else if (data.Messages.length > 0) {
                    logger.info('Messages Count: ' + data.Messages.length);

                    var delete_batch = new Array();
                    for (var x=0;x<data.Messages.length;x++) {
                        // process
                        receiveMessage(data.Messages[x]);

                        // flag to delete

                        var pck = new Array();
                        pck['Id'] = data.Messages[x].MessageId;
                        pck['ReceiptHandle'] = data.Messages[x].ReceiptHandle;

                        delete_batch.push(pck);
                    }

                    if (delete_batch.length > 0) {
                        logger.info('Calling Delete');
                        sqs.deleteMessageBatch({
                            Entries: delete_batch,
                            QueueUrl: queueUrl
                        }, function(err, data) {
                            if (err) {
                                logger.fatal('Failed to delete messages');
                                logger.fatal(err);
                            } else {
                                logger.debug('Deleted recieved ok');
                            }
                        });
                    }
                } else {
                    logger.info('No Messages Count');
                }
            }
        });
Run Code Online (Sandbox Code Playgroud)

receiveMessage 是我的“如果我有足够的收集的消息,就用收集的消息做事”功能

有时,我的脚本会停止,因为我根本没有收到 Amazon 的响应,例如,队列中没有要使用的消息,而不是点击 WaitTimeSeconds 并发送“无消息对象”,回调是t叫。

(我正在把这个写给 Amazon Weirdness)

我要问的是检测和处理此问题的最佳方法是什么,因为我有一些代码可以停止对 receiveMessage 的并发调用。

此处建议的答案:Nodejs sqs 队列处理器也有防止并发消息请求查询的代码(假设它一次只获取一条消息)

我确实把整件事都包起来了

var running = false;
runMonitorJob = setInterval(function() {
    if (running) {
    } else {
        running = true;
        // call SQS.receive
    }
}, 500);
Run Code Online (Sandbox Code Playgroud)

(在删除循环之后运行 = false (不在它的回调中))

我的解决方案是

watchdogTimeout = setTimeout(function() {
    running = false;
}, 30000);
Run Code Online (Sandbox Code Playgroud)

但是,随着时间的推移,这肯定会留下一堆浮动的 sqs.receive 并因此留下大量内存吗?

(这个工作一直在运行,我周五让它继续运行,周六早上它停了下来,直到今天早上我手动重新启动了这个工作)

编辑:我见过挂起约 5 分钟然后突然收到消息等待时间为 20 秒的情况,它应该在 20 秒后抛出“无消息”。所以大约 10 分钟的 WatchDog 可能更实用(取决于其余的业务逻辑)

编辑:是的长轮询已在队列端配置。

编辑:这是在aws-sdk和 NodeJS v4.4.4 的(最新)v2.3.9 下

小智 1

我已经追查这个(或类似)问题几天了,这是我注意到的:

  • receiveMessage 调用最终确实返回,尽管仅在 120 秒后

  • 对 receiveMessage 的并发调用由 AWS.SDK 库序列化,因此并行进行多个调用不会产生任何效果。

  • receiveMessage 回调不会出错 - 事实上,在 120 秒过去后,它可能包含消息。

关于这个还能做什么?发生这种事情的原因有很多,其中一些/许多事情不一定能得到解决。答案是运行多个服务,每个服务都调用 receiveMessage 并在消息到来时对其进行处理 - SQS 支持这一点。在任何时候,其中一项服务可能会出现 120 秒的延迟,但其他服务应该能够继续正常运行。

我的特殊问题是,我有一些关键的单例服务无法承受 120 秒的停机时间。为此,我将研究 1) 使用 HTTP 而不是 SQS 将消息推送到我的服务中,或者 2) 在每个单例周围生成从属进程,以从 SQS 获取消息并将它们推送到服务中。