Ada*_*gyo 4 amazon-web-services amazon-kinesis aws-lambda
我有一个非常简单的 lambda 函数(nodeJS),它将接收到的事件放入 kinesis 流中。这是源代码:
'use strict';
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
exports.handler = async (event, context, callback) => {
let body = JSON.parse(event.body);
let receptionDate = new Date().toISOString();
let partitionKey = "pKey-" + Math.floor(Math.random() * 10);
// Response format needed for API Gateway
const formatResponse = (status, responseBody) => {
return {
statusCode: status,
headers: { "Content-Type": "application/json" },
body: JSON.stringify(responseBody)
}
}
// body.events is an array of events. Just add the reception date in each events.
for(let e of body.events) {
e.reception_date = receptionDate;
}
console.log("put In kinesis stream");
let kinesisParams = {
Data: new Buffer(JSON.stringify(body) + "\n"),
PartitionKey: partitionKey,
StreamName: 'event_test'
};
kinesis.putRecord(kinesisParams, (err, res) => {
console.log("Kinesis.putRecord DONE");
if(err) {
console.log("putRecord Error:", JSON.stringify(err));
callback(null, formatResponse(500, "Internal Error: " + JSON.stringify(err)));
} else {
console.log("putRecord Success:", JSON.stringify(res));
callback(null, formatResponse(200));
}
});
};
Run Code Online (Sandbox Code Playgroud)
执行这段代码时,在cloudwatch中的日志如下:
START RequestId: 5d4d7526-1a40-401f-8417-06435f0e5408 Version: $LATEST
2019-01-11T09:39:11.925Z 5d4d7526-1a40-401f-8417-06435f0e5408 put In kinesis stream
END RequestId: 5d4d7526-1a40-401f-8417-06435f0e5408
REPORT RequestId: 5d4d7526-1a40-401f-8417-06435f0e5408 Duration: 519.65 ms Billed Duration: 600 ms Memory Size: 128 MB Max Memory Used: 28 MB
Run Code Online (Sandbox Code Playgroud)
似乎没有调用 kinesis.putRecord... 我在 kinesis 流日志中看不到任何内容。我肯定在某个地方错了,但我不知道在哪里!
kinesis.putRecord 是一个异步操作,它在完成(无论成功还是有错误)时调用回调(第二个参数)。
asyncfunction 是一个返回 promise 的函数。Lambda 将在此承诺得到解决时完成其执行,即使还有其他异步操作尚未完成。由于您的函数不返回任何内容,因此在函数结束时立即解决承诺,因此执行将立即完成 - 无需等待您的异步kinesis.putRecord任务。
使用async处理程序时,您不需要调用回调。相反,你可以返回任何你想要的东西,或者抛出一个错误。Lambda 会得到它并分别响应。
所以你在这里有两个选择:
await的代码中没有任何内容,只需删除async. 在这种情况下,Lambda 正在等待事件循环变空(除非您明确更改 context.callbackWaitsForEmptyEventLoop)kinesis.putRecord:let result;
try {
result = await kinesis.putRecord(kinesisParams).promise();
} catch (err) {
console.log("putRecord Error:", JSON.stringify(err));
throw Error(formatResponse(500, "Internal Error: " + JSON.stringify(err));
}
console.log("putRecord Success:", JSON.stringify(result));
return formatResponse(200);
Run Code Online (Sandbox Code Playgroud)
在第二个选项中,lambda 将继续运行直到kinesis.putRecord完成。
有关这种情况下 Lambda 行为的更多信息,您可以/var/runtime/node_modules/awslambda/index.js在 lambda 容器中查看执行处理程序的主要代码。
| 归档时间: |
|
| 查看次数: |
6098 次 |
| 最近记录: |