如何使用AWS JavaScript SDK(dynamoDB)处理UnprocessedItems?

kun*_*ruh 7 javascript lambda amazon-web-services node.js amazon-dynamodb

我正在尝试使用AWS Lambda函数来处理来自SendGrid的事件.据我了解,该事件将是一个包含可变数量的JSON对象的数组,每个对象代表一个给定的事件.我想使用batchWriteItem将这些事件写入DynamoDB并循环该过程,直到我没有返回任何UnprocessedItems.但是,我陷入了无限循环.这是我现在的代码:

console.log('Loading function');

var aws = require('aws-sdk');
var dynamo = new aws.DynamoDB();
params = {};

exports.handler = function(sg_event, context) {

    var items = [];
    for(var i = 0; i < sg_event.length; i++) {
        var obj = sg_event[i];
        var request = {
            PutRequest: {
                Item: {
                    email: { S: obj.email },
                    timestamp: { S: obj.timestamp.toString() },
                    sg_message_id: { S: obj.sg_message_id },
                    event: { S: obj.event }
                }
            }
        };
        items.push(request);
    }

    params = {
        RequestItems: {
            sendgrid_response: items
        }
    }

    do {
        dynamo.batchWriteItem( params, function(err, data) {
            if(err)
                context.fail(err);
            else
                params.RequestItems = data.UnprocessedItems;
        });
    } while(!isEmpty(params.RequestItems));
};

function isEmpty(obj) {
    return (Object.keys(obj).length === 0);
}
Run Code Online (Sandbox Code Playgroud)

我认为问题是试图在回调函数中设置params,但我不知道我应该怎么做...我知道我可以在原始回调中使用UnprocessedItems调用另一个batchWriteItem ,但我仍然需要能够根据需要多次运行该函数,以确保编写所有UnprocessedItems.如何正确循环batchWriteItem?

小智 7

@Daniela Miao,感谢分享解决方案。

我们可以在您发布的代码中添加一个代码块,这将避免来自 DynamoDB 的异常。这将在再次请求 DynamoDB 批量写入之前检查params.RequestItems是否有未处理的数据。

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) {
  if (err) { 
     //fail
  } else {
    var params = {};
    params.RequestItems = data.UnprocessedItems;
    /*
    * Added Code block 
    */
    if(Object.keys(params.RequestItems).length != 0) {
      db.batchWriteItem(params, processItemsCallback);
    }
  }
};

db.batchWriteItem(/*initial params*/, processItemsCallback);
Run Code Online (Sandbox Code Playgroud)


Bin*_*Ren 6

这是我使用“await”语法的代码示例。所以这段代码必须在一个异步函数中。它会在重试之前进行随机延迟。

do {
    batchWriteResp = await dynamo.batchWriteItem({RequestItems:batchWriteItems}).promise()
    if (Object.keys(batchWriteResp.UnprocessedItems).length>0) {
        batchWriteItems = batchWriteResp.UnprocessedItems
        // delay a random time between 0.5~2.5 seconds
        const delay = Math.floor(Math.random() * 2000 + 500)
        await new Promise(resolve => setTimeout(resolve, delay));
    } else {
        break
    }
} while (true)
Run Code Online (Sandbox Code Playgroud)


小智 5

Nodejs是单线程的,它首先执行所有主要功能,因此你的while循环永远不会完成,回调将永远不会执行.

这是你如何做到的:

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) {
  if (err) { 
     //fail
  } else {
    var params = {};
    params.RequestItems = data.UnprocessedItems;
    db.batchWriteItem(params, processItemsCallback);
  }
};

db.batchWriteItem(/*initial params*/, processItemsCallback);
Run Code Online (Sandbox Code Playgroud)

  • 是否有可能陷入无限循环? (4认同)
  • @PiminKonstantinKefaloukos 不,“UnprocessedItems”是正确的。请参阅[文档](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#batchWriteItem-property) (2认同)