AWS Glue 和更新重复数据

jos*_*y10 7 python etl amazon-web-services pyspark aws-glue

我正在使用 AWS Glue 将多个文件从 S3 移动到 RDS 实例。每天我都会在 S3 中获取一个新文件,该文件可能包含新数据,但也可能包含我已经保存的带有一些更新值的记录。如果我多次运行该作业,我当然会在数据库中获得重复的记录。如果 Glue 注意到某个字段已更改,我希望 Glue 尝试更新该记录,而不是插入多个记录,每个记录都有一个唯一的 ID。这可能吗?

Tha*_*mar 5

我遵循了 Yuriy 建议作为第二个选项的类似方法。获取现有数据以及新数据,然后进行一些处理以合并它们并以覆盖模式写入。以下代码将帮助您了解如何解决此问题。

sc = SparkContext()
glueContext = GlueContext(sc)

#get your source data 
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df =  src_data.toDF()


#get your destination data 
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df =  dst_data.toDF()

#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)

#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options(   url = dest_jdbc_url, 
                                          user = dest_user_name,
                                          password = dest_password,
                                          dbtable = dest_tbl ).mode("overwrite").save()
Run Code Online (Sandbox Code Playgroud)


Yur*_*ruk 3

不幸的是,没有优雅的方法可以用 Glue 来做到这一点。如果您要写入 Redshift,您可以使用它postactions来实现 Redshift 合并操作。但是,对于其他 jdbc 接收器来说这是不可能的(据我所知)。

或者,在 ETL 脚本中,您可以从数据库加载现有数据,以在保存之前过滤掉现有记录。但是,如果您的数据库表很大,那么作业可能需要一段时间来处理它。

另一种方法是首先以“覆盖”模式写入暂存表(替换现有暂存数据),然后通过 API 调用数据库以将新记录仅复制到最终表中。