AWS Glue 未将 id(int) 列复制到 Redshift - 它是空白的

Sur*_*ndy 5 mysql amazon-web-services amazon-redshift pyspark aws-glue

Glue 有一个非常奇怪的问题。使用它对我从 MySQL RDS 迁移到 Redshift 的数据运行一些 ETL。使用我在另一个表上使用的相同代码,它运行良好并按应有的方式复制了所有数据。

但是在第二个表上,由于某种原因,它不会从 MySQL 复制 id 列中的数据。Redshift 上的 id 列完全空白。

query_df = spark.read.format("jdbc").option("url", 
args['RDSURL']).option("driver", 
args['RDSDRIVER']).option("dbtable", 
args['RDSQUERY']).option("user", args['RDSUSER']).option("password", 
args['RDSPASS']).load()

datasource0 = DynamicFrame.fromDF(query_df, glueContext, 
"datasource0")

logging.info(datasource0.show())

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("id", "int", "id", "int"), ... , transformation_ctx = 
"applymapping1")

logging.info(applymapping1.show())
Run Code Online (Sandbox Code Playgroud)

从上面打印的日志中,我可以看到即使在 ApplyMapping 之后动态框架也包含 id 字段。

datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = 
applymapping1, catalog_connection = args['RSCLUSTER'], 
connection_options = {"dbtable": args['RSTABLE'], "database": 
args['RSDB']}, redshift_tmp_dir = args["TempDir"], 
transformation_ctx = "datasink2")
Run Code Online (Sandbox Code Playgroud)

我认为问题似乎发生在这里?在此作业完成后,在检查 Redshift 时,id 列完全为空。

对这种行为感到非常困惑。确切的代码在另一个表上运行良好,这两个表中的 id 之间的唯一区别是该表的 id 为 int (11) unsigned 而代码工作的表的 id 为 int (10) 签名。

小智 2

我在使用 Glue 从 MySQL RDS 中提取数据时就遇到过这种行为。对于任何寻求此问题答案的人 - 原因如下:AWSGlue 有“类型选择”的概念,其中已爬网列的确切类型可以在整个 ETL 作业中保留为多种可能性,因为爬网程序仅爬网列数据的子集来确定可能的类型,但不能最终确定。这就是为什么转换为使用显式架构而不是爬虫将解决问题,因为它不涉及任何类型选择。

当作业运行时(或者您查看预览)Spark 将尝试处理整个列数据集。此时,列类型可能会解析为与数据集不兼容的类型 - 即解释器无法决定正确的类型选择,这会导致相关列的数据为空。我在从 MySQL 数据库转换多个表时经历过这种情况,并且没有明显的模式说明为什么有些失败而有些失败,我已经能够确定,尽管它必须与数据库列中的数据相关。

解决方案是在脚本中添加明确的选择解决方案,方法是将失败的列转换为所需的目标类型,如下所示:

df.resolveChoice(specs = [('id', 'cast:int')])
Run Code Online (Sandbox Code Playgroud)

其中 df 是数据框。这将强制将该列解释为预期类型,并应导致该列中数据的预期输出。这一直对我有用。

请注意,对于使用 Glue Studio 可视化编辑器的用户,现在可以添加“自定义转换”步骤,其中包含为您执行此操作的代码。在本例中,转换代码应如下所示:

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0])
df_resolved = df.resolveChoice(specs = [('id', 'cast:int')])
return (DynamicFrameCollection({"CustomTransform0": df_resolved}, glueContext))
Run Code Online (Sandbox Code Playgroud)

另请注意,在这种情况下,需要在此自定义转换节点后进行“从集合中选择”转换,因为自定义转换返回集合而不是单个帧。