DynamoDB 数据迁移与转换

Ana*_*nt 5 etl amazon-s3 amazon-web-services amazon-dynamodb

我需要将数据从表A迁移到表B。

我需要将表 A 中具有最大 rangeKey 的每个项目移动到表 B。

也就是说,每个具有最大版本的项目将驻留在表 B 中。

我想过扫描表 A,然后对表 B 进行条件写入(如果仅存在具有较小 rangeKey 值的项目,或者如果该项目根本不存在,则写入),但这似乎不可行,因为这需要太长时间(表 A 很大)。

有一个更好的方法吗 ?

谢谢!

Mat*_*ava 2

是的,它可以更有效地完成,但首先您需要获取存储在表中的分区键的所有值。

如果您还不知道这些值,那么您将需要从scan表中提取这些唯一值,您可以使用它ProjectionExpression来仅返回分区键值。

Python 中的示例

def get_partition_key_values():
    response = dynamodb_client.scan(
        TableName=SOURCE_TABLE,
        ProjectionExpression=PARTITION_KEY_NAME
    )

    values = [item[PARTITION_KEY_NAME][PARTITION_KEY_TYPE]
              for item in response['Items']]
    return set(values)
Run Code Online (Sandbox Code Playgroud)

现在您已经有了分区键值,您可以简单地在循环中查询表中的每个唯一分区键,使用设置ScanIndexForward为的属性FalseLimit返回值的数量对结果进行排序,其中1每个分区键将按“最大”排序返回一项钥匙。

当您循环遍历键并获取所需的项目时,您可以将它们一一放入目标表中

def copy_items(partition_keys):
    for key in partition_keys:
        item = dynamodb_client.query(
            TableName=SOURCE_TABLE,
            KeyConditionExpression='#pid = :pid',
            ExpressionAttributeNames={
                '#pid': PARTITION_KEY_NAME
            },
            ExpressionAttributeValues={
                ':pid': {
                    PARTITION_KEY_TYPE: key
                }
            },
            Limit=1,
            ScanIndexForward=False
        )['Items'][0]

        dynamodb_client.put_item(
            TableName=DESTINATION_TABLE,
            Item=item
        )
Run Code Online (Sandbox Code Playgroud)

这是完整的代码

import boto3

dynamodb_client = boto3.client('dynamodb')

SOURCE_TABLE = 'products'
DESTINATION_TABLE = 'products_copy'

PARTITION_KEY_NAME = 'product_id'
PARTITION_KEY_TYPE = 'S'


def get_partition_key_values():
    response = dynamodb_client.scan(
        TableName=SOURCE_TABLE,
        ProjectionExpression=PARTITION_KEY_NAME
    )

    values = [item[PARTITION_KEY_NAME][PARTITION_KEY_TYPE]
              for item in response['Items']]
    return set(values)


def copy_items(partition_keys):
    for key in partition_keys:
        item = dynamodb_client.query(
            TableName=SOURCE_TABLE,
            KeyConditionExpression='#pid = :pid',
            ExpressionAttributeNames={
                '#pid': PARTITION_KEY_NAME
            },
            ExpressionAttributeValues={
                ':pid': {
                    PARTITION_KEY_TYPE: key
                }
            },
            Limit=1,
            ScanIndexForward=False
        )['Items'][0]

        dynamodb_client.put_item(
            TableName=DESTINATION_TABLE,
            Item=item
        )


unique_partition_key_values = get_partition_key_values()

copy_items(unique_partition_key_values)
Run Code Online (Sandbox Code Playgroud)

请注意,上面的代码假设源表和目标表具有相同的主键架构。如果这两个模式在您的情况下不同,那么您将需要执行一些额外的转换/映射。