小编San*_*ngh的帖子

使用 AWSglue 合并多个 parquet 文件并在 s3 中创建更大的 parquet 文件

我正在尝试使用 aws 胶水作业合并多个镶木地板文件。我知道这里提到的类似问题和可能的解决方案。我已经尝试过,但似乎不起作用。这是我的示例代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

job.init(args['JOB_NAME'], args)

df = glueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://bucket-name/parquet/2021/02/15/15/"]})
partitioned_df=df.toDF().repartition(1)
partitioned_dynamic_df=DynamicFrame.fromDF(partitioned_df,glueContext,"partitioned_df")

datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet")

job.commit()
Run Code Online (Sandbox Code Playgroud)

我已经打印出来了partitioned_dynamic_df,它是所有镶木地板的组合 df。但我不断收到此错误消息,并且不知道如何解决。

Traceback (most recent call last):
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return …
Run Code Online (Sandbox Code Playgroud)

scala amazon-s3 amazon-web-services apache-spark aws-glue

4
推荐指数
1
解决办法
7331
查看次数