AWS Glue:如何在输出中添加带有源文件名的列?

mar*_*son 5 amazon-web-services apache-spark pyspark aws-glue

有人知道在Glue作业中将源文件名添加为列的方法吗?

我们创建了一个流程,在其中爬行了S3中的一些文件以创建模式。然后,我们编写了一个作业,将文件转换为新格式,并将这些文件作为CSV写回到另一个S3存储桶中,以供我们的管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向输出文件中添加一个包含原始文件名的新列。

我浏览了AWS文档和aws-glue-libs源,但没有发现任何问题。理想情况下,将有某种方法可以从awsglue.job包中获取元数据(我们使用的是python风格)。

我仍在学习Glue,因此如果我使用了错误的术语,我们深表歉意。我也用spark标签对其进行了标记,因为我相信这就是Glue在幕后使用的东西。

JcM*_*aco 5

通过 AWS Glue Python 自动生成的脚本,我添加了以下行:

from pyspark.sql.functions import input_file_name

## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())

## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")
Run Code Online (Sandbox Code Playgroud)

然后,在代码的ApplyMapping或部分中,您引用.datasinkdatasource2


Yur*_*ruk 4

您可以在 etl 工作中使用 Spark 来完成此操作:

var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))
Run Code Online (Sandbox Code Playgroud)

请记住,它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()

  • 效果很好!我用“from pyspark.sql.functions import input_file_name”导入了“input_file_name”。 (2认同)