如何为DynamoDB将超过25个项目/行写入Table?

cod*_*ark 4 database amazon-web-services amazon-dynamodb amazon-redshift

我对Amazon DynamoDB相当陌生。我目前有20000行需要添加到表中。但是,根据我所阅读的内容,使用BatchWriteItem类和25个WriteRequests似乎一次最多只能写25行。有可能增加吗?如何一次写超过25行?目前大约需要15分钟才能写入所有20000行。谢谢。

Dav*_*ray 8

您最多只能在单个 BatchWriteItem 请求中发送 25 个项目,但您可以一次发送任意数量的 BatchWriteItem 请求。假设您已经提供了足够的写入吞吐量,您应该能够通过在多个线程/进程/主机之间拆分这 20k 行并将它们并行推送到数据库来显着加快速度。

对于这么小的数据集来说,它可能有点重量级,但您可以使用AWS Data Pipeline从 S3 中提取数据。它基本上自动化了创建 Hadoop 集群的过程,以从 S3 中提取您的数据,并在一堆并行的 BatchWriteItem 请求中将其发送到 DynamoDB。

  • 是的,您需要以某种方式将所有数据发送到 S3 才能使用数据管道。如果它已经全部在您的应用程序本地,那么首先将其上传到 S3,然后导入到 DynamoDB 几乎肯定会比直接并行上传到 DynamoDB 慢。如果您的应用程序是从其他地方获取它,而该地方可能是 S3,那么数据管道将使您不必编写代码来进行并行传输。 (2认同)

Gee*_*Lad 6

我正在寻找一些代码来使用JavaScript SDK进行此操作。我找不到它,所以我自己整理了一下。我希望这可以帮助其他人!

function multiWrite(table, data, cb) {
    var AWS = require('aws-sdk');
    var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});

    // Build the batches
    var batches = [];
    var current_batch = [];
    var item_count = 0;
    for(var x in data) {
        // Add the item to the current batch
        item_count++;
        current_batch.push({
            PutRequest: {
                Item: data[x]
            }
        });
        // If we've added 25 items, add the current batch to the batches array
        // and reset it
        if(item_count%25 == 0) {
            batches.push(current_batch);
            current_batch = [];
        }
    }
    // Add the last batch if it has records and is not equal to 25
    if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);

    // Handler for the database operations
    var completed_requests = 0;
    var errors = false;
    function handler(request) {
        return function(err, data) {
            // Increment the completed requests
            completed_requests++;

            // Set the errors flag
            errors = (errors) ? true : err;

            // Log the error if we got one
            if(err) {
                console.error(JSON.stringify(err, null, 2));
                console.error("Request that caused database error:");
                console.error(JSON.stringify(request, null, 2));
            }

            // Make the callback if we've completed all the requests
            if(completed_requests == batches.length) {
                cb(errors);
            }
        }
    }

    // Make the requests
    var params;
    for(x in batches) {
        // Items go in params.RequestItems.id array
        // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
        params = '{"RequestItems": {"' + table + '": []}}';
        params = JSON.parse(params);
        params.RequestItems[table] = batches[x];

        // Perform the batchWrite operation
        db.batchWrite(params, handler(params));
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 请注意,这不会处理每个批次写入中的失败。例如,您可能需要处理从batchWrite 结果返回的UnprocessedItems。有关更多信息,请参阅 https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html (3认同)