将多个原始文件合并到单个拼花文件中

Dus*_*vic 3 etl apache-spark pyspark aws-glue

在S3中,我有大量的事件被yyyy/mm/dd/hh分区.每个分区都有大约80.000个原始文本文件.每个原始文件都有大约1.000个JSON格式的事件.

当我运行脚本来进行转换时:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database=from_database,
                                                                table_name=from_table,
                                                                transformation_ctx="datasource0")
map0 = Map.apply(frame=datasource0, f=extract_data)
applymapping1 = ApplyMapping.apply(......)
applymapping1.toDF().write.mode('append').parquet(output_bucket, partitionBy=['year', 'month', 'day', 'hour'])
Run Code Online (Sandbox Code Playgroud)

我最终在分区上有大量小文件,名称如下:

part-00000-a5aa817d-482c-47d0-b804-81d793d3ac88.snappy.parquet
part-00001-a5aa817d-482c-47d0-b804-81d793d3ac88.snappy.parquet
part-00002-a5aa817d-482c-47d0-b804-81d793d3ac88.snappy.parquet
Run Code Online (Sandbox Code Playgroud)

每个都是1-3KB的大小.Number大致对应于我拥有的原始文件数.

我的印象是Glue将从目录中获取所有事件,按照我想要的方式对它们进行分区,并将每个分区存储在一个文件中.

我如何实现这一目标?

Sah*_*sai 6

您只需设置repartition(1)将所有分区中的数据随机分配到单个分区,该分区将在写入时生成单个输出文件.

applymapping1.toDF()
             .repartition(1)
             .write
             .mode('append')
             .parquet(output_bucket, partitionBy=['year', 'month', 'day', 'hour'])
Run Code Online (Sandbox Code Playgroud)

  • 我不认为 repartition(1) 会执行“为每个分区创建一个文件”的要求。repartition(1) 将创建 1 个分区,而不是每个分区创建一个文件。 (3认同)
  • coalesce(1) 和 repartition(1) 在功能上是相同的,因为整个数据必须在一个分区中传输,必须进行一次洗牌,并且这两个操作都是相同的, (2认同)