使用glueContext.write_dynamic_frame.from_options 的AWS Glue 导出到镶木地板问题

Ale*_*hod 5 etl amazon-web-services pyspark aws-glue

我有以下问题。

以下代码由 AWS Glue 自动生成。

它的任务是从 Athena 获取数据(由 .csv @ S3 备份)并将数据转换为 Parquet。

该代码适用于参考航班数据集和一些相对较大的表(~100 Gb)。

但是,在大多数情况下,它返回错误,这并没有告诉我太多。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkConf, SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

conf = (SparkConf()
    .set("spark.driver.maxResultSize", "8g"))

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "XXX", table_name = "csv_impressions", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("event time", "long", "event_time", "long"), ("user id", "string", "user_id", "string"), ("advertiser id", "long", "advertiser_id", "long"), ("campaign id", "long", "campaign_id", "long")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
Run Code Online (Sandbox Code Playgroud)

AWS Glue 识别的错误消息是:

调用 o72.pyWriteDynamicFrame 时出错

日志文件还包含:

由于阶段失败,作业中止:...写入行时任务失败

知道如何找出失败的原因吗?

或者它可能是什么?

Ale*_*hod 6

第 1 部分:识别问题

如何找到导致问题的原因的解决方案是将输出从.parquetto切换.csv并删除ResolveChoiceor DropNullFields(正如 Glue for 自动建议的那样.parquet):

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
Run Code Online (Sandbox Code Playgroud)

它产生了更详细的错误消息:

调用 o120.pyWriteDynamicFrame 时发生错误。作业因阶段失败而中止:阶段 0.0 中的任务 5 失败 4 次,最近一次失败:阶段 0.0 中丢失任务 5.3(TID 182,ip-172-31-78-99.ec2.internal,执行器 15):com. amazonaws.services.glue.util.FatalException:无法解析文件:xxxx1.csv.gz

错误消息中提到的文件xxxx1.csv.gz对于 Glue 来说似乎太大(.gzip未压缩时约为 100Mb 和约 350Mb .csv)。

第 2 部分:问题的真正根源和解决方法

正如第一部分中提到的,由于导出到,.csv可以识别错误的文件。

通过将 .csv 加载到 R 中进行进一步调查发现,其中一列包含单个string记录,而该列的所有其他值都是longNULL

在 R 中删除该值并将数据重新上传到 S3 后,问题就消失了。

注意#1:该列是string在 Athena 中声明的,因此我认为此行为是错误

注意#2:问题的本质不在于数据的大小。我已经成功处理了高达 200 Mb 的文件.csv.gz,相当于大约 600 Mb .csv

  • 那么你是如何解决这个问题的呢? (2认同)