如何在 AWS Glue 中正确重命名动态数据帧的列?

Cel*_*ell 4 python amazon-web-services pyspark amazon-athena aws-glue

我加载 JSON 数据并在动态数据帧上使用关系化方法来展平原本嵌套的 JSON 对象并将其保存为镶木地板格式。问题是,一旦保存为 parquet 格式以实现更快的 Athena 查询,列名称将包含点,这违反了 Athena SQL 查询语法,因此我无法进行特定于列的查询。

为了解决这个问题,我还重命名了 Glue 作业中的列名称以排除点并添加下划线。我的问题是这两种方法哪种更好,为什么?(效率-内存?节点上的执行速度?等等)。

另外,鉴于糟糕的 awsglue 文档,我无法想出仅动态框架的解决方案。我在以动态方式获取列名称时遇到问题,因此我正在利用 toDF()。

1)第一种方法是从动态 df 中提取 df 获取列名

relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
    df_relationalize1 = relationalize1.toDF()
    for field in df_relationalize1.schema.fields:
        relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)
Run Code Online (Sandbox Code Playgroud)

2)第二种方法是从动态 df 中提取 df 并在 pyspark df (而不是动态 df)上执行重命名字段,然后转换回动态 df 并将其保存为 parquet 格式。

有更好的方法吗?爬虫可以重命名列吗?.fromDF() 方法有多快?有没有比 pdf 开发人员指南更好的函数和方法文档?

abl*_*nge 6

该问题特别询问有关重命名的问题:

(a) 转换为DataFrame.
(b)new_columns以与 相同的顺序创建具有所需列名称的数组old_columns
(c)new_columns使用functools.reduce()和覆盖并保留pyspark.withColumnRenamed()
(d) 转换回DynamicFrame.

from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce

JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)

# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://path/to/source/file.csv"]},
    format_options={"withHeader": True, "separator": chr(44)},  # comma delimited
)

# (a) Convert to DataFrame
df = datasource.toDF()

# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
    field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]

# (c) Overwrite and persist `new_columns`
df = reduce(
    lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
    range(len(old_columns)),
    df,
)

# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")

# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
    frame=datasource,
    connection_type="s3",
    connection_options={"path": "s3://path/to/target/prefix/"},
    format="parquet",
)
Run Code Online (Sandbox Code Playgroud)

块引用