mar*_*son 5 amazon-web-services apache-spark pyspark aws-glue
有人知道在Glue作业中将源文件名添加为列的方法吗?
我们创建了一个流程,在其中爬行了S3中的一些文件以创建模式。然后,我们编写了一个作业,将文件转换为新格式,并将这些文件作为CSV写回到另一个S3存储桶中,以供我们的管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向输出文件中添加一个包含原始文件名的新列。
我浏览了AWS文档和aws-glue-libs源,但没有发现任何问题。理想情况下,将有某种方法可以从awsglue.job包中获取元数据(我们使用的是python风格)。
我仍在学习Glue,因此如果我使用了错误的术语,我们深表歉意。我也用spark标签对其进行了标记,因为我相信这就是Glue在幕后使用的东西。
通过 AWS Glue Python 自动生成的脚本,我添加了以下行:
from pyspark.sql.functions import input_file_name
## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())
## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")
Run Code Online (Sandbox Code Playgroud)
然后,在代码的ApplyMapping或部分中,您引用.datasinkdatasource2
您可以在 etl 工作中使用 Spark 来完成此操作:
var df = glueContext.getCatalogSource(
database = database,
tableName = table,
transformationContext = s"source-$database.$table"
).getDynamicFrame()
.toDF()
.withColumn("input_file_name", input_file_name())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> args("DST_S3_PATH")
)),
transformationContext = "",
format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))
Run Code Online (Sandbox Code Playgroud)
请记住,它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()
| 归档时间: |
|
| 查看次数: |
3652 次 |
| 最近记录: |