我正在尝试使用 Glue 对从 RDS 迁移到 Redshift 的数据进行 ETL。
据我所知,Glue 书签仅使用指定的主键查找新行,而不跟踪更新的行。
然而,我正在处理的数据往往会频繁更新行,我正在寻找可能的解决方案。我对 pyspark 有点陌生,所以如果可以在 pyspark 中执行此操作,我将非常感谢一些指导或正确方向的观点。如果 Spark 之外有可能的解决方案,我也很想听听。
Glue 有一个非常奇怪的问题。使用它对我从 MySQL RDS 迁移到 Redshift 的数据运行一些 ETL。使用我在另一个表上使用的相同代码,它运行良好并按应有的方式复制了所有数据。
但是在第二个表上,由于某种原因,它不会从 MySQL 复制 id 列中的数据。Redshift 上的 id 列完全空白。
query_df = spark.read.format("jdbc").option("url",
args['RDSURL']).option("driver",
args['RDSDRIVER']).option("dbtable",
args['RDSQUERY']).option("user", args['RDSUSER']).option("password",
args['RDSPASS']).load()
datasource0 = DynamicFrame.fromDF(query_df, glueContext,
"datasource0")
logging.info(datasource0.show())
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings =
[("id", "int", "id", "int"), ... , transformation_ctx =
"applymapping1")
logging.info(applymapping1.show())
Run Code Online (Sandbox Code Playgroud)
从上面打印的日志中,我可以看到即使在 ApplyMapping 之后动态框架也包含 id 字段。
datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame =
applymapping1, catalog_connection = args['RSCLUSTER'],
connection_options = {"dbtable": args['RSTABLE'], "database":
args['RSDB']}, redshift_tmp_dir = args["TempDir"],
transformation_ctx = "datasink2")
Run Code Online (Sandbox Code Playgroud)
我认为问题似乎发生在这里?在此作业完成后,在检查 Redshift 时,id 列完全为空。
对这种行为感到非常困惑。确切的代码在另一个表上运行良好,这两个表中的 id 之间的唯一区别是该表的 id …