Ste*_*ano 22 amazon-web-services amazon-kinesis aws-lambda
我对Amazon Kinesis很新,所以也许这只是我理解中的一个问题,但在AWS Lambda FAQ中它说:
每个分片严格序列化发送到您的AWS Lambda函数的Amazon Kinesis和DynamoDB Streams记录.这意味着如果将两个记录放在同一个分片中,Lambda保证在使用第二个记录调用Lambda函数之前,将使用第一个记录成功调用它.如果一个记录的调用超时,受到限制或遇到任何其他错误,Lambda将重试直到成功(或记录达到其24小时到期),然后再转到下一个记录.不保证跨不同分片的记录顺序,并且每个分片的处理并行发生.
我的问题是,如果由于某种原因某些格式错误的数据被生产者放入一个分片并且当Lambda函数选择它时它会出错,然后只是不断重试会发生什么?这意味着该特定分片的处理将被错误阻止24小时.
处理应用程序错误的最佳做法是将问题包装在自定义错误中,并将此错误与所有成功处理的记录一起发送到下游并让消费者处理它?当然,在一个不可恢复的错误导致程序像空指针一样崩溃的情况下,这仍然无济于事:我们将在接下来的24小时内再次回到阻塞重试循环.
az3*_*az3 31
不要过度思考,Kinesis只是一个队列.您必须成功使用记录(即从队列中弹出)才能继续下一个记录.就像FIFO堆栈一样.
适当的方法应该是:
顺便说一下,如果记录处理时间超过1分钟,很明显你做错了.因为Kinesis的设计目的是每秒处理数千条记录,所以您不应该为每个记录处理这么长的工作.
您要问的问题是队列系统的一般问题,有时称为"有毒消息".您必须在业务逻辑中处理它们才能安全.
http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages
Guy*_*Guy 15
这是关于在Kinesis中处理事件的常见问题,我将尝试为您提供一些建议您的Lambda函数来处理"损坏"数据的问题.由于最佳做法是将系统的分离部分写入Kinesis流并从Kinesis流中读取其他部分,因此通常会遇到此类问题.
首先,为什么会有这样的问题?
使用Kinesis处理您的事件是打破复杂系统的好方法,该系统同时执行前端处理(服务最终用户),同时/代码后端处理(分析事件),分成两个独立的部分你的系统.前端人员可以专注于他们的业务,而后端人员不需要将代码更改推送到前端,如果他们想要添加功能来为他们的分析用例提供服务.Kinesis是事件的缓冲区,它既可以打破同步需求,也可以简化业务逻辑代码.
因此,我们希望写入流的事件在其" 模式 "中是灵活的,如果前端团队希望更改事件格式,添加字段,删除字段,更改协议或加密密钥,它们应该是能够像他们想要的那样经常这样做.
现在,从流中读取的团队能够以有效的方式处理此类灵活事件,并且不会在每次发生此类更改时中断处理.因此,您的Lambda函数通常会看到无法处理的事件,并且" poison-pill "并不像您预期的那样罕见.
其次,你如何处理这些有问题的事件?
您的Lambda函数将获得一批要处理的事件.请注意,您不应该逐个参加活动,而是参加大批活动.如果您的批次太小,您将很快在流上获得大量滞后.
对于每个批处理,您将迭代事件,处理它们,然后在DynamoDB中检查批处理的最后序列ID.Lambda自动执行大部分这些步骤(请参阅此处:http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):
console.log('Loading function');
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
context.succeed();
};
Run Code Online (Sandbox Code Playgroud)
如果处理所有事件没有任何问题,这就是"快乐路径"中发生的情况.但是,如果您在批处理中遇到任何问题并且未通过成功通知" 提交 "事件,则批处理将失败,您将再次获得批处理中的所有事件.
现在您需要确定处理失败的原因是什么.
临时问题(限制,网络问题......) - 等待一秒再试几次是可以的.在许多情况下,问题将自行解决.
偶然问题(内存不足......) - 最好增加Lambda函数的内存分配或减小批量大小.在许多情况下,此类修改将解决该问题.
持续失败 - 这意味着您必须忽略有问题的事件(将其置于DLQ - 死信函队列中)或修改您的代码来处理它.
问题是识别代码中的失败类型并以不同方式处理它.您需要以识别它的方式编写Lambda代码(例如,异常类型)并做出不同的反应.
您可以使用与CloudWatch的集成将此类故障写入控制台并创建相关警报.您还可以使用CloudWatch Logs记录"死信号队列",并查看问题的根源.
| 归档时间: |
|
| 查看次数: |
14088 次 |
| 最近记录: |