如何将批量数据从csv导入到dynamodb

Hem*_*mar 28 csv amazon-web-services amazon-dynamodb

我正在尝试将__CODE__文件数据导入dynamodb.

请给我一个建议.

first_name  last_name
sri ram
Rahul   Dravid
JetPay  Underwriter
Anil Kumar  Gurram
Run Code Online (Sandbox Code Playgroud)

Has*_*que 12

使用哪种语言导入数据.我只是在nodejs中编写一个函数,可以将csv文件导入到dynamodb表中.它首先将整个csv解析为数组,将数组拆分为块(25),然后将batchWriteItem解析为表.

注意:DynamoDB在batchinsert中一次只允许1到25条记录.所以我们必须将数组拆分成块.

    var fs = require('fs');
    var parse = require('csv-parse');
    var async = require('async');

    var csv_filename = "YOUR_CSV_FILENAME_WITH_ABSOLUTE_PATH";

    rs = fs.createReadStream(csv_filename);
    parser = parse({
        columns : true,
        delimiter : ','
    }, function(err, data) {

        var split_arrays = [], size = 25;

        while (data.length > 0) {
            split_arrays.push(data.splice(0, size));
        }
        data_imported = false;
        chunk_no = 1;

        async.each(split_arrays, function(item_data, callback) {
            ddb.batchWriteItem({
                "TABLE_NAME" : item_data
            }, {}, function(err, res, cap) {
                console.log('done going next');
                if (err == null) {
                    console.log('Success chunk #' + chunk_no);
                    data_imported = true;
                } else {
                    console.log(err);
                    console.log('Fail chunk #' + chunk_no);
                    data_imported = false;
                }
                chunk_no++;
                callback();
            });

        }, function() {
            // run after loops
            console.log('all data imported....');

        });

    });
    rs.pipe(parser);
Run Code Online (Sandbox Code Playgroud)


bjf*_*her 11

您可以使用适用于此类的AWS Data Pipeline.您可以将文件上载csv到S3,然后使用Data Pipeline检索和填充DynamoDB表.他们有一个分步教程.

  • 真是个糟糕的答案!要仅执行CSV导入,他们建议弹出一个EMR集群,其中包含所有成本.... (10认同)
  • 我们决定在我们最新的项目中使用 Dynamo,我们每天都在后悔这个决定。我们也可能只是切换到 Postgres。 (3认同)

小智 8

作为一个没有权限创建数据管道的低dev,我不得不使用这个javascript.Hassan Sidique的代码略显过时,但这对我有用:

var fs = require('fs');
var parse = require('csv-parse');
var async = require('async');
const AWS = require('aws-sdk');
const dynamodbDocClient = new AWS.DynamoDB({ region: "eu-west-1" });

var csv_filename = "./CSV.csv";

rs = fs.createReadStream(csv_filename);
parser = parse({
    columns : true,
    delimiter : ','
}, function(err, data) {
    var split_arrays = [], size = 25;

    while (data.length > 0) {

        //split_arrays.push(data.splice(0, size));
        let cur25 = data.splice(0, size)
        let item_data = []

        for (var i = cur25.length - 1; i >= 0; i--) {
          const this_item = {
            "PutRequest" : {
              "Item": {
                // your column names here will vary, but you'll need do define the type
                "Title": {
                  "S": cur25[i].Title
                },
                "Col2": {
                  "N": cur25[i].Col2
                },
                "Col3": {
                  "N": cur25[i].Col3
                }
              }
            }
          };
          item_data.push(this_item)
        }
        split_arrays.push(item_data);
    }
    data_imported = false;
    chunk_no = 1;
    async.each(split_arrays, (item_data, callback) => {
        const params = {
            RequestItems: {
                "tagPerformance" : item_data
            }
        }
        dynamodbDocClient.batchWriteItem(params, function(err, res, cap) {
            if (err === null) {
                console.log('Success chunk #' + chunk_no);
                data_imported = true;
            } else {
                console.log(err);
                console.log('Fail chunk #' + chunk_no);
                data_imported = false;
            }
            chunk_no++;
            callback();
        });

    }, () => {
        // run after loops
        console.log('all data imported....');

    });

});
rs.pipe(parser);
Run Code Online (Sandbox Code Playgroud)

  • 我们可以得到一些意见吗?什么是cur25?经过一些小的修改后,这段代码并没有为我直接逐行循环,我不确定这里发生了什么。 (2认同)

Mat*_*lva 7

我为此创造了一个宝石。

现在你可以通过运行安装它gem install dynamocli,然后你可以使用命令:

dynamocli import your_data.csv --to your_table
Run Code Online (Sandbox Code Playgroud)

这是源代码的链接:https : //github.com/matheussilvasantos/dynamocli

  • 今天用它来将一个 dynamodb 表的内容迁移到另一个表,非常有用! (2认同)
  • 这在很大程度上是这个问题的最佳解决方案。谢谢你! (2认同)

a-h*_*a-h 6

我编写了一个工具来使用并行执行来执行此操作,该工具不需要在机器上安装任何依赖项或开发人员工具(它是用 Go 编写的)。

它可以处理:

  • 逗号分隔 (CSV) 文件
  • 制表符分隔 (TSV) 文件
  • 大文件
  • 本地文件
  • S3 上的文件
  • 在 AWS 中使用 AWS Step Functions 进行并行导入,每分钟导入超过 400 万行
  • 无依赖(不需要 .NET、Python、Node.js、Docker、AWS CLI 等)

它适用于 MacOS、Linux、Windows 和 Docker:https : //github.com/ah/ddbimport

示例截图

以下是我的测试结果,表明它可以使用 AWS Step Functions 更快地并行导入。

在此处输入图片说明

我将在 2020 年 5 月 15 日 BST 1155 的 AWS 社区峰会上更详细地描述该工具 - https://www.twitch.tv/awscomsum


drh*_*rhr 5

在获取我的代码之前,请注意一些关于在本地进行测试的注意事项

我建议使用DynamoDB的本地版本,以防万一,请在开始收费之前检查一下。在发布此消息之前,我做了一些小的修改,因此请务必使用对您有意义的任何方式进行测试。我已注释掉一个虚假的批量上传作业,您可以用它代替远程或本地的任何DynamoDB服务,以在stdout中验证这是否符合您的需求。

dynamodb-local

npmjs上查看dynamodb-local 或手动安装

如果您走了手动安装路线,则可以使用类似以下的方式启动dynamodb-local:

java -Djava.library.path=<PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal_lib/\
     -jar <PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal.jar\
     -inMemory\
     -sharedDb
Run Code Online (Sandbox Code Playgroud)

npm路线可能更简单。

dynamodb管理员

除此之外,请参见dynamodb-admin

我使用安装了dynamodb-admin npm i -g dynamodb-admin。然后可以使用以下命令运行它:

dynamodb-admin
Run Code Online (Sandbox Code Playgroud)

使用它们:

dynamodb-local默认为localhost:8000

dynamodb-admin是默认为的网页localhost:8001。启动这两项服务后,请localhost:8001在浏览器中打开以查看和操作数据库。

下面的脚本不会创建数据库。使用dynamodb-admin此。

功劳归于...

代码

  • 我对JS和Node.js的了解不如对其他语言的了解,因此请原谅任何JS辅助程序。
  • 您会注意到每组并行批处理有意地降低了900ms。这是一个骇人听闻的解决方案,我将其留在此处作为示例(并且由于懒惰,并且因为您没有付钱给我)。
  • 如果增加MAX_CONCURRENT_BATCHES,则需要基于WCU,项目大小,批处理大小和新的并发级别来计算适当的延迟量。
  • 另一种方法是打开Auto Scaling并为每个失败的批次实施指数补偿。就像我在下面的评论之一中提到的那样,在某些情况下,实际上不需要这样做,因为在给定WCU限制和数据大小的情况下,可以计算出实际上可以执行多少次写操作,而只需让您代码始终以可预测的速度运行。
  • 您可能想知道为什么我不只是让AWS开发工具包处理并发。好问题。可能会稍微简化一点。您可以通过将MAX_CONCURRENT_BATCHES应用于maxSocketsconfig选项,并修改创建批处理数组的代码,使其仅向前传递单个批处理,进行试验。
java -Djava.library.path=<PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal_lib/\
     -jar <PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal.jar\
     -inMemory\
     -sharedDb
Run Code Online (Sandbox Code Playgroud)


gad*_*icc 5

更新了2019 Javascript代码

我对上面的任何Javascript代码示例都不太满意。从上面的Hassan Siddique答案开始,我已经更新到最新的API,包括示例凭证代码,将所有用户配置移至顶部,在缺少时添加了uuid()并去除了空白字符串。

const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');
const uuid = require('uuid/v4');
const AWS = require('aws-sdk');

// --- start user config ---

const AWS_CREDENTIALS_PROFILE = 'serverless-admin';
const CSV_FILENAME = "./majou.csv";
const DYNAMODB_REGION = 'eu-central-1';
const DYNAMODB_TABLENAME = 'entriesTable';

// --- end user config ---

const credentials = new AWS.SharedIniFileCredentials({
  profile: AWS_CREDENTIALS_PROFILE
});
AWS.config.credentials = credentials;
const docClient = new AWS.DynamoDB.DocumentClient({
  region: DYNAMODB_REGION
});

const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
  columns: true,
  delimiter: ','
}, function(err, data) {

  var split_arrays = [],
    size = 25;

  while (data.length > 0) {
    split_arrays.push(data.splice(0, size));
  }
  data_imported = false;
  chunk_no = 1;

  async.each(split_arrays, function(item_data, callback) {
    const params = {
      RequestItems: {}
    };
    params.RequestItems[DYNAMODB_TABLENAME] = [];
    item_data.forEach(item => {
      for (key of Object.keys(item)) {
        // An AttributeValue may not contain an empty string
        if (item[key] === '')
          delete item[key];
      }

      params.RequestItems[DYNAMODB_TABLENAME].push({
        PutRequest: {
          Item: {
            id: uuid(),
            ...item
          }
        }
      });
    });

    docClient.batchWrite(params, function(err, res, cap) {
      console.log('done going next');
      if (err == null) {
        console.log('Success chunk #' + chunk_no);
        data_imported = true;
      } else {
        console.log(err);
        console.log('Fail chunk #' + chunk_no);
        data_imported = false;
      }
      chunk_no++;
      callback();
    });

  }, function() {
    // run after loops
    console.log('all data imported....');

  });

});
rs.pipe(parser);
Run Code Online (Sandbox Code Playgroud)