Iur*_*tup 6 python amazon-web-services amazon-dynamodb aws-glue
我需要将数据从一个发电机表复制到另一个发电机表,并在此过程中进行一些转换。为此,我将数据从源表导出到 s3 并对其运行爬虫。在我的 Glue Job 中,我使用以下代码:
mapped = apply_mapping.ApplyMapping.apply(
frame=source_df,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.createdAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
df = mapped.toDF() //convert to spark df
// apply some transformation
target_df = DynamicFrame.fromDF(df, glue_context, 'target_df') //convert to dynamic frame
glue_context.write_dynamic_frame_from_options(
frame=target_df,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "eu-west-1",
"dynamodb.output.tableName": "my-table",
"dynamodb.throughput.write.percent": "1.0"
}
)
Run Code Online (Sandbox Code Playgroud)
在源 dynamo 表中,该options字段是一个字符串集。在转变过程中,它保持不变。但是,目标表中有一个字符串列表:
"options": {
"L": [
{
"S": "option A"
},
{
"S": "option B"
}
]
}
Run Code Online (Sandbox Code Playgroud)
谁能建议如何使用 AWS Glue 将字符串集写入 DynamoDB 中?
不幸的是,我找不到使用 Glue 接口将字符串集写入 DynamoDB 的方法。我找到了一些使用 boto3 和 Spark 的解决方案,所以这是我的解决方案。我跳过了转换部分并总体简化了示例。
# Load source data from catalog
source_dyf = glue_context.create_dynamic_frame_from_catalog(
GLUE_DB, GLUE_TABLE, transformation_ctx="source"
)
# Map dynamo attributes
mapped_dyf = ApplyMapping.apply(
frame=source_dyf,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.updatedAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
def _putitem(items):
resource = boto3.resource("dynamodb")
table = resource.Table("new_table")
with table.batch_writer() as batch_writer:
for item in items:
batch_writer.put_item(Item=item)
df = mapped_dyf.toDF()
# Apply spark transformations ...
# save partitions to dynamo
df.rdd.mapPartitions(_putitem).collect()
Run Code Online (Sandbox Code Playgroud)
根据您的数据量,您可能需要增加 boto3 中的重试次数,甚至更改机制。另外,您可能想使用 DynamoDB 配置。我切换到按需运行此特定迁移,但有一个问题
| 归档时间: |
|
| 查看次数: |
2154 次 |
| 最近记录: |