Lar*_*ler 6 amazon-s3 apache-spark boto3 pyspark aws-glue
我有通过胶水爬行器爬行的 CSV 数据,最终出现在一张表中。
我正在尝试运行 ETL 作业,将磁盘上的数据重新分区到日期列的某些组件中。然后将 CSV 转换为 parquet。
即我的数据中有一个名为“date”的列,并且想要将数据分区为 s3 上的年、月、日分区。
我能够转换为 parquet 并使其在序列号值(不同的列)上正确分区,但它会将值“__HIVE_DEFAULT_PARTITION__”放入日期相关分区的所有值年、月和日中。
我可以对其他列(例如序列号)进行分区,但年/月/日不在原始数据集中,因此我的方法是从日期列创建值作为数据中的新列设置并告诉 write_dynamic_frame 函数按列分区,但这不起作用。
一般来说,我对 Spark/pyspark 和glue 很陌生,所以我很可能错过了一些简单的东西。
感谢任何提供帮助的人。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
to_spark_df4 = dropnullfields3.toDF()
with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))
back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
Run Code Online (Sandbox Code Playgroud)
结果是我在 s3 中的键最终看起来像这样:
serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet
Run Code Online (Sandbox Code Playgroud)
更新:已编辑格式
我从事的工作与你的非常相似。我希望您现在能够解决这个问题,但无论如何,这是解决您困境的方法:
from pyspark.sql.functions import year, month, dayofmonth
###### rest of your code until ApplyMapping included ######
# add year, month & day columns, non zero-padded
df = df.toDF()
df = df.withColumn('year', year(df.date))\
.withColumn('month', month(df.date))\
.withColumn('day', dayofmonth(df.date))
Run Code Online (Sandbox Code Playgroud)
如果您需要在 Athena 上运行查询并选择日期范围,我建议您避免使用嵌套分区(即年 -> 月 -> 天),而是使用平面分区架构。这样做的原因是查询变得更容易编写。下面是获取平面模式的 python 代码:
from pyspark.sql.functions import date_format
###### rest of your code until ApplyMapping included ######
df = df.toDF()
df = df.withColumn('date_2', date_format(df.date, 'yyyy-MM-dd'))
# date_2 is because column "date" already exists,
# but we want the partitioning one to be in a string format.
# You can later drop the original column if you wish.
Run Code Online (Sandbox Code Playgroud)
假设现在您要查询 2020 年 3 月 15 日至 4 月 3 日的数据。
以下是基于您选择的分区架构的 SQL 查询。
SELECT item_1, item_2
FROM my_table
WHERE year = 2020
AND (
(month = 3 AND day >= 15)
OR (month = 4 AND day <= 3)
)
Run Code Online (Sandbox Code Playgroud)
SELECT item_1, item_2
FROM my_table
WHERE date BETWEEN '2020-03-15' AND '2020-04-3'
Run Code Online (Sandbox Code Playgroud)
此外,鉴于您的“日期”列存储为字符串,您将能够使用LIKE
运算符运行查询。
例如,如果您想查询数据库中每年四月的所有数据,您可以执行以下操作:
SELECT item_1, item_2
FROM my_table
WHERE date LIKE '%-04-%'
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6120 次 |
最近记录: |