jos*_*y10 7 python etl amazon-web-services pyspark aws-glue
我正在使用 AWS Glue 将多个文件从 S3 移动到 RDS 实例。每天我都会在 S3 中获取一个新文件,该文件可能包含新数据,但也可能包含我已经保存的带有一些更新值的记录。如果我多次运行该作业,我当然会在数据库中获得重复的记录。如果 Glue 注意到某个字段已更改,我希望 Glue 尝试更新该记录,而不是插入多个记录,每个记录都有一个唯一的 ID。这可能吗?
我遵循了 Yuriy 建议作为第二个选项的类似方法。获取现有数据以及新数据,然后进行一些处理以合并它们并以覆盖模式写入。以下代码将帮助您了解如何解决此问题。
sc = SparkContext()
glueContext = GlueContext(sc)
#get your source data
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df = src_data.toDF()
#get your destination data
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df = dst_data.toDF()
#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)
#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options( url = dest_jdbc_url,
user = dest_user_name,
password = dest_password,
dbtable = dest_tbl ).mode("overwrite").save()
Run Code Online (Sandbox Code Playgroud)
不幸的是,没有优雅的方法可以用 Glue 来做到这一点。如果您要写入 Redshift,您可以使用它postactions来实现 Redshift 合并操作。但是,对于其他 jdbc 接收器来说这是不可能的(据我所知)。
或者,在 ETL 脚本中,您可以从数据库加载现有数据,以在保存之前过滤掉现有记录。但是,如果您的数据库表很大,那么作业可能需要一段时间来处理它。
另一种方法是首先以“覆盖”模式写入暂存表(替换现有暂存数据),然后通过 API 调用数据库以将新记录仅复制到最终表中。
| 归档时间: |
|
| 查看次数: |
6297 次 |
| 最近记录: |