Cel*_*ell 4 python amazon-web-services pyspark amazon-athena aws-glue
我加载 JSON 数据并在动态数据帧上使用关系化方法来展平原本嵌套的 JSON 对象并将其保存为镶木地板格式。问题是,一旦保存为 parquet 格式以实现更快的 Athena 查询,列名称将包含点,这违反了 Athena SQL 查询语法,因此我无法进行特定于列的查询。
为了解决这个问题,我还重命名了 Glue 作业中的列名称以排除点并添加下划线。我的问题是这两种方法哪种更好,为什么?(效率-内存?节点上的执行速度?等等)。
另外,鉴于糟糕的 awsglue 文档,我无法想出仅动态框架的解决方案。我在以动态方式获取列名称时遇到问题,因此我正在利用 toDF()。
1)第一种方法是从动态 df 中提取 df 获取列名
relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)
Run Code Online (Sandbox Code Playgroud)
2)第二种方法是从动态 df 中提取 df 并在 pyspark df (而不是动态 df)上执行重命名字段,然后转换回动态 df 并将其保存为 parquet 格式。
有更好的方法吗?爬虫可以重命名列吗?.fromDF() 方法有多快?有没有比 pdf 开发人员指南更好的函数和方法文档?
该问题特别询问有关重命名的问题:
(a) 转换为DataFrame.
(b)new_columns以与 相同的顺序创建具有所需列名称的数组old_columns。
(c)new_columns使用functools.reduce()和覆盖并保留pyspark.withColumnRenamed()。
(d) 转换回DynamicFrame.
from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce
JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)
# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://path/to/source/file.csv"]},
format_options={"withHeader": True, "separator": chr(44)}, # comma delimited
)
# (a) Convert to DataFrame
df = datasource.toDF()
# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]
# (c) Overwrite and persist `new_columns`
df = reduce(
lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
range(len(old_columns)),
df,
)
# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")
# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
frame=datasource,
connection_type="s3",
connection_options={"path": "s3://path/to/target/prefix/"},
format="parquet",
)Run Code Online (Sandbox Code Playgroud)
块引用
| 归档时间: |
|
| 查看次数: |
4814 次 |
| 最近记录: |