Hem*_*mar 28 csv amazon-web-services amazon-dynamodb
我正在尝试将__CODE__文件数据导入dynamodb.
请给我一个建议.
first_name last_name
sri ram
Rahul Dravid
JetPay Underwriter
Anil Kumar Gurram
Run Code Online (Sandbox Code Playgroud)
Has*_*que 12
使用哪种语言导入数据.我只是在nodejs中编写一个函数,可以将csv文件导入到dynamodb表中.它首先将整个csv解析为数组,将数组拆分为块(25),然后将batchWriteItem解析为表.
注意:DynamoDB在batchinsert中一次只允许1到25条记录.所以我们必须将数组拆分成块.
var fs = require('fs');
var parse = require('csv-parse');
var async = require('async');
var csv_filename = "YOUR_CSV_FILENAME_WITH_ABSOLUTE_PATH";
rs = fs.createReadStream(csv_filename);
parser = parse({
columns : true,
delimiter : ','
}, function(err, data) {
var split_arrays = [], size = 25;
while (data.length > 0) {
split_arrays.push(data.splice(0, size));
}
data_imported = false;
chunk_no = 1;
async.each(split_arrays, function(item_data, callback) {
ddb.batchWriteItem({
"TABLE_NAME" : item_data
}, {}, function(err, res, cap) {
console.log('done going next');
if (err == null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, function() {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
Run Code Online (Sandbox Code Playgroud)
bjf*_*her 11
您可以使用适用于此类的AWS Data Pipeline.您可以将文件上载csv到S3,然后使用Data Pipeline检索和填充DynamoDB表.他们有一个分步教程.
小智 8
作为一个没有权限创建数据管道的低dev,我不得不使用这个javascript.Hassan Sidique的代码略显过时,但这对我有用:
var fs = require('fs');
var parse = require('csv-parse');
var async = require('async');
const AWS = require('aws-sdk');
const dynamodbDocClient = new AWS.DynamoDB({ region: "eu-west-1" });
var csv_filename = "./CSV.csv";
rs = fs.createReadStream(csv_filename);
parser = parse({
columns : true,
delimiter : ','
}, function(err, data) {
var split_arrays = [], size = 25;
while (data.length > 0) {
//split_arrays.push(data.splice(0, size));
let cur25 = data.splice(0, size)
let item_data = []
for (var i = cur25.length - 1; i >= 0; i--) {
const this_item = {
"PutRequest" : {
"Item": {
// your column names here will vary, but you'll need do define the type
"Title": {
"S": cur25[i].Title
},
"Col2": {
"N": cur25[i].Col2
},
"Col3": {
"N": cur25[i].Col3
}
}
}
};
item_data.push(this_item)
}
split_arrays.push(item_data);
}
data_imported = false;
chunk_no = 1;
async.each(split_arrays, (item_data, callback) => {
const params = {
RequestItems: {
"tagPerformance" : item_data
}
}
dynamodbDocClient.batchWriteItem(params, function(err, res, cap) {
if (err === null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, () => {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);Run Code Online (Sandbox Code Playgroud)
我为此创造了一个宝石。
现在你可以通过运行安装它gem install dynamocli,然后你可以使用命令:
dynamocli import your_data.csv --to your_table
Run Code Online (Sandbox Code Playgroud)
这是源代码的链接:https : //github.com/matheussilvasantos/dynamocli
我编写了一个工具来使用并行执行来执行此操作,该工具不需要在机器上安装任何依赖项或开发人员工具(它是用 Go 编写的)。
它可以处理:
它适用于 MacOS、Linux、Windows 和 Docker:https : //github.com/ah/ddbimport
以下是我的测试结果,表明它可以使用 AWS Step Functions 更快地并行导入。
我将在 2020 年 5 月 15 日 BST 1155 的 AWS 社区峰会上更详细地描述该工具 - https://www.twitch.tv/awscomsum
我建议使用DynamoDB的本地版本,以防万一,请在开始收费之前检查一下。在发布此消息之前,我做了一些小的修改,因此请务必使用对您有意义的任何方式进行测试。我已注释掉一个虚假的批量上传作业,您可以用它代替远程或本地的任何DynamoDB服务,以在stdout中验证这是否符合您的需求。
如果您走了手动安装路线,则可以使用类似以下的方式启动dynamodb-local:
java -Djava.library.path=<PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal_lib/\
-jar <PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal.jar\
-inMemory\
-sharedDb
Run Code Online (Sandbox Code Playgroud)
npm路线可能更简单。
除此之外,请参见dynamodb-admin。
我使用安装了dynamodb-admin npm i -g dynamodb-admin。然后可以使用以下命令运行它:
dynamodb-admin
Run Code Online (Sandbox Code Playgroud)
dynamodb-local默认为localhost:8000。
dynamodb-admin是默认为的网页localhost:8001。启动这两项服务后,请localhost:8001在浏览器中打开以查看和操作数据库。
下面的脚本不会创建数据库。使用dynamodb-admin此。
maxSocketsconfig选项,并修改创建批处理数组的代码,使其仅向前传递单个批处理,进行试验。java -Djava.library.path=<PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal_lib/\
-jar <PATH_TO_DYNAMODB_LOCAL>/DynamoDBLocal.jar\
-inMemory\
-sharedDb
Run Code Online (Sandbox Code Playgroud)
更新了2019 Javascript代码
我对上面的任何Javascript代码示例都不太满意。从上面的Hassan Siddique答案开始,我已经更新到最新的API,包括示例凭证代码,将所有用户配置移至顶部,在缺少时添加了uuid()并去除了空白字符串。
const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');
const uuid = require('uuid/v4');
const AWS = require('aws-sdk');
// --- start user config ---
const AWS_CREDENTIALS_PROFILE = 'serverless-admin';
const CSV_FILENAME = "./majou.csv";
const DYNAMODB_REGION = 'eu-central-1';
const DYNAMODB_TABLENAME = 'entriesTable';
// --- end user config ---
const credentials = new AWS.SharedIniFileCredentials({
profile: AWS_CREDENTIALS_PROFILE
});
AWS.config.credentials = credentials;
const docClient = new AWS.DynamoDB.DocumentClient({
region: DYNAMODB_REGION
});
const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
columns: true,
delimiter: ','
}, function(err, data) {
var split_arrays = [],
size = 25;
while (data.length > 0) {
split_arrays.push(data.splice(0, size));
}
data_imported = false;
chunk_no = 1;
async.each(split_arrays, function(item_data, callback) {
const params = {
RequestItems: {}
};
params.RequestItems[DYNAMODB_TABLENAME] = [];
item_data.forEach(item => {
for (key of Object.keys(item)) {
// An AttributeValue may not contain an empty string
if (item[key] === '')
delete item[key];
}
params.RequestItems[DYNAMODB_TABLENAME].push({
PutRequest: {
Item: {
id: uuid(),
...item
}
}
});
});
docClient.batchWrite(params, function(err, res, cap) {
console.log('done going next');
if (err == null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, function() {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
49216 次 |
| 最近记录: |