将 CSV 或 JSON 文件导入 DynamoDB

Fra*_*urt 7 import dynamodb csv

我有 1000 个 CSV 文件。每个 CSV 文件的大小在 1 到 500 MB 之间,并且采用相同的格式(即相同的列顺序)。我有一个列标题的头文件,它与我的 DynamoDB 表的列名相匹配。我需要将这些文件导入到 DynamoDB 表中。这样做的最佳方式/工具是什么?

我可以将这些 CSV 文件连接成一个单一的巨型文件(尽管我宁愿避免这样做),或者在需要时将它们转换为 JSON。我知道BatchWriteItem的存在,所以我想一个好的解决方案将涉及批量写入。


例子:

  • DynamoDB 表有两列:first_name、last_name
  • 头文件只包含: first_name,last_name
  • 一个 CSV 文件看起来像

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
Run Code Online (Sandbox Code Playgroud)

Fra*_*urt 12

最后,我编写了一个 Python 函数import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types),将 CSV 导入到 DynamoDB 表中。必须指定列名和列。它使用boto,并从这个要点中汲取了很多灵感。下面是函数以及使用的演示 ( main()) 和 CSV 文件。在使用 Python 2.7.5 的 Windows 7 x64 上进行测试,但它应该适用于任何具有 boto 和 Python 的操作系统。

import boto

MY_ACCESS_KEY_ID = 'copy your access key ID here'
MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'


def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
    '''
    From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
    '''
    batch_list = dynamodb_conn.new_batch_write_list()
    batch_list.add_batch(dynamodb_table, puts=items)
    while True:
        response = dynamodb_conn.batch_write_item(batch_list)
        unprocessed = response.get('UnprocessedItems', None)
        if not unprocessed:
            break
        batch_list = dynamodb_conn.new_batch_write_list()
        unprocessed_list = unprocessed[table_name]
        items = []
        for u in unprocessed_list:
            item_attr = u['PutRequest']['Item']
            item = dynamodb_table.new_item(
                    attrs=item_attr
            )
            items.append(item)
        batch_list.add_batch(dynamodb_table, puts=items)


def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types):
    '''
    Import a CSV file to a DynamoDB table
    '''        
    dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
    dynamodb_table = dynamodb_conn.get_table(table_name)     
    BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB

    items = []

    count = 0
    csv_file = open(csv_file_name, 'r')
    for cur_line in csv_file:
        count += 1
        cur_line = cur_line.strip().split(',')

        row = {}
        for colunm_number, colunm_name in enumerate(colunm_names):
            row[colunm_name] = column_types[colunm_number](cur_line[colunm_number])

        item = dynamodb_table.new_item(
                    attrs=row
            )           
        items.append(item)

        if count % BATCH_COUNT == 0:
            print 'batch write start ... ', 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
            items = []
            print 'batch done! (row number: ' + str(count) + ')'

    # flush remaining items, if any
    if len(items) > 0: 
        do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)


    csv_file.close() 


def main():
    '''
    Demonstration of the use of import_csv_to_dynamodb()
    We assume the existence of a table named `test_persons`, with
    - Last_name as primary hash key (type: string)
    - First_name as primary range key (type: string)
    '''
    colunm_names = 'Last_name First_name'.split()
    table_name = 'test_persons'
    csv_file_name = 'test.csv'
    column_types = [str, str]
    import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)


if __name__ == "__main__":
    main()
    #cProfile.run('main()') # if you want to do some profiling
Run Code Online (Sandbox Code Playgroud)

test.csv的内容(必须与 Python 脚本位于同一文件夹中):

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
a,b
c,d
e,f
g,h
i,j
j,l
Run Code Online (Sandbox Code Playgroud)