为什么 pyspark 中的 S3 目录源的 input_file_name() 为空?

Wil*_*oft 4 amazon-s3 amazon-web-services apache-spark pyspark aws-glue

我正在尝试获取通过 AWS Glue 中的 S3 数据目录加载的每个文件的输入文件名(或路径)。

读过一些input_file_name()应该提供这些信息的地方(尽管警告说这只在调用时有效,from_catalog而不是from_options,我相信我是!)。

所以下面的代码似乎应该可以工作,但总是为每个input_file_name.

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name


args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session


job = Job(gc)
job.init(args['JOB_NAME'], args)


# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
    database='database_name',
    table_name='table_name',
    transformation_ctx='fm_source',
)

df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
Run Code Online (Sandbox Code Playgroud)

结果输出:

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+
Run Code Online (Sandbox Code Playgroud)

还有另一种方法可以创建确保input_file_name()填充的框架吗?我现在尝试通过create_dynamic_frame.from_catalog,create_dynamic_frame.from_options和构建源框架getSource().getFrame(),但我得到相同的结果,input_file_name每个框架都有一个空列。

Vzz*_*arr 7

我还添加了我的经验,就我而言,由于调用了该cache()方法,我收到了一个空结果。

例如:

import pyspark.sql.functions as F

df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())

df.show()
Run Code Online (Sandbox Code Playgroud)

我收到

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+
Run Code Online (Sandbox Code Playgroud)

但是,如果我删除该行,df.cache()该列会input_file_name正确显示输入文件名。

一种解决方法可能是F.input_file_name()在缓存之前调用。

  • 绝妙的洞察力! (2认同)
  • 这对我也有用。如果您需要在交互式 Spark 会话(即 Jupyter Notebook)中删除缓存表,请考虑使用“spark.catalog.clearCache()” (2认同)

Wil*_*oft 6

我相信在使用该groupFiles选项时这是不可能的,因为在幕后 Glue 正在连接文件以创建最佳数量的输入。因此, 的概念input_file_name在这种情况下没有意义,因为原始文件路径不再是直接输入。

然而,这些文档在某种意义上有点误导,即使对于少于 50,000 个文件的输入,不明确禁用该选项也会触发 Glue 根据其文件大小连接输入。在我们的例子中,我们有数千个微小的输入文件 (<1 MB) 导致了这种行为。

您可以通过显式禁用分组来轻松验证这一点(请注意,这将对类似于我们的场景产生严重的性能影响:

ds_s3 = self.gc.getSource(
    connection_type='s3',
    paths=paths,
    groupFiles='none',
)
fm_s3 = ds_s3.getFrame()
Run Code Online (Sandbox Code Playgroud)

当然,最好不要依赖于输入状态或上下文,因此我们最终编写了一个在 S3PUT上触发的 AWS Lambda,它将元数据(包括文件名和路径)写入文件本身。