Wil*_*oft 4 amazon-s3 amazon-web-services apache-spark pyspark aws-glue
我正在尝试获取通过 AWS Glue 中的 S3 数据目录加载的每个文件的输入文件名(或路径)。
我读过一些input_file_name()
应该提供这些信息的地方(尽管警告说这只在调用时有效,from_catalog
而不是from_options
,我相信我是!)。
所以下面的代码似乎应该可以工作,但总是为每个input_file_name
.
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
database='database_name',
table_name='table_name',
transformation_ctx='fm_source',
)
df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
Run Code Online (Sandbox Code Playgroud)
结果输出:
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
Run Code Online (Sandbox Code Playgroud)
还有另一种方法可以创建确保input_file_name()
填充的框架吗?我现在尝试通过create_dynamic_frame.from_catalog
,create_dynamic_frame.from_options
和构建源框架getSource().getFrame()
,但我得到相同的结果,input_file_name
每个框架都有一个空列。
我还添加了我的经验,就我而言,由于调用了该cache()
方法,我收到了一个空结果。
例如:
import pyspark.sql.functions as F
df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())
df.show()
Run Code Online (Sandbox Code Playgroud)
我收到
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
Run Code Online (Sandbox Code Playgroud)
但是,如果我删除该行,df.cache()
该列会input_file_name
正确显示输入文件名。
一种解决方法可能是F.input_file_name()
在缓存之前调用。
我相信在使用该groupFiles
选项时这是不可能的,因为在幕后 Glue 正在连接文件以创建最佳数量的输入。因此, 的概念input_file_name
在这种情况下没有意义,因为原始文件路径不再是直接输入。
然而,这些文档在某种意义上有点误导,即使对于少于 50,000 个文件的输入,不明确禁用该选项也会触发 Glue 根据其文件大小连接输入。在我们的例子中,我们有数千个微小的输入文件 (<1 MB) 导致了这种行为。
您可以通过显式禁用分组来轻松验证这一点(请注意,这将对类似于我们的场景产生严重的性能影响:
ds_s3 = self.gc.getSource(
connection_type='s3',
paths=paths,
groupFiles='none',
)
fm_s3 = ds_s3.getFrame()
Run Code Online (Sandbox Code Playgroud)
当然,最好不要依赖于输入状态或上下文,因此我们最终编写了一个在 S3PUT
上触发的 AWS Lambda,它将元数据(包括文件名和路径)写入文件本身。
归档时间: |
|
查看次数: |
3781 次 |
最近记录: |