coz*_*zos 5 hive amazon-s3 bucket apache-spark parquet
我正在尝试在相当大的数据集上使用 Spark 的bucketBy功能。
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
Run Code Online (Sandbox Code Playgroud)
问题是我的 Spark 集群有大约 500 个分区/任务/执行器(不确定术语),所以我最终得到的文件如下所示:
part-00001-{UUID}_00001.c000.snappy.parquet
part-00001-{UUID}_00002.c000.snappy.parquet
...
part-00001-{UUID}_00500.c000.snappy.parquet
part-00002-{UUID}_00001.c000.snappy.parquet
part-00002-{UUID}_00002.c000.snappy.parquet
...
part-00002-{UUID}_00500.c000.snappy.parquet
part-00500-{UUID}_00001.c000.snappy.parquet
part-00500-{UUID}_00002.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
Run Code Online (Sandbox Code Playgroud)
这是 500x500=250000 个分桶镶木地板文件!FileOutputCommitter将其提交到 S3需要很长时间。
有没有一种方法可以像 Hive 一样为每个存储桶生成一个文件?或者有更好的方法来处理这个问题吗?截至目前,我似乎必须在降低集群的并行性(减少编写器数量)或减少镶木地板文件的并行性(减少存储桶数量)之间做出选择。
谢谢
为了让每个最终存储桶获取 1 个文件,请执行以下操作。在将数据帧写入表重新分区之前,它使用与用于分桶的列完全相同的列,并将新分区的数量设置为等于您将在bucketBy中使用的桶数(或较小的数字,即数字的除数)桶的数量,尽管我不认为这里有理由使用较小的数量)。
在你的情况下,可能看起来像这样:
dataframe.repartition(500, bucketColumn1, bucketColumn2)
.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
Run Code Online (Sandbox Code Playgroud)
在保存到现有表的情况下,您需要确保列的类型完全匹配(例如,如果您的列 X 在数据框中是 INT,但在您要插入到重新分区中的表中是 BIGINT) 500 个存储桶与 X 视为 BIGINT 的重新分区不匹配,最终将导致 500 个执行程序中的每一个执行程序再次写入 500 个文件)。
100% 清楚地讲 - 这种重新分区将在您的执行中添加另一个步骤,即收集 1 个执行器上每个存储桶的数据(因此,如果数据之前没有以相同的方式分区,则需要进行一次完整的数据重新洗牌)。我假设这正是您想要的。
在另一个答案的评论中也提到,如果您的分桶密钥倾斜,您需要为可能出现的问题做好准备。确实如此,但是如果您在加载表后所做的第一件事是聚合/连接您存储的相同列(对于选择这些列的桶)。相反,您会遇到延迟问题,并且只有在写入后尝试加载数据时才会看到倾斜。
在我看来,如果 Spark 提供一个设置,以便在写入分桶表之前(尤其是在插入现有表时)始终对数据进行重新分区,那就太好了。
| 归档时间: |
|
| 查看次数: |
4926 次 |
| 最近记录: |