相关疑难解决方法(0)

pyspark:有效地使partitionBy写入与原始表相同数量的总分区

我有一个与pyspark repartitionBy()函数相关的问题,我最初在这个问题的评论中发布了这个问题.我被要求将其作为一个单独的问题发布,所以这里是:

据我所知,df.partitionBy(COL)将每个值写入所有行COL到他们自己的文件夹,并且每个文件夹将(假设行以前通过其他键分布在所有分区上)具有与之前在文件中大致相同的文件数.整张桌子.我发现这种行为很烦人.如果我有一个包含500个分区的大表,并且我partitionBy(COL)在一些属性列上使用,我现在有100个文件夹,每个文件夹包含500个(现在非常小)文件.

我想要的是partitionBy(COL)行为,但文件大小和文件数量大致相同.

作为演示,上一个问题共享一个玩具示例,其中有一个包含10个分区的表,partitionBy(dayOfWeek)并且现在有70个文件,因为每个文件夹中有10个.我想要~10个文件,每天一个,可能需要2或3天,有更多的数据.

这可以轻松完成吗?喜欢的东西,df.write().repartition(COL).partitionBy(COL)好像它可能工作,但我担心,(在一个非常大的表,该表将被划分为多个文件夹的情况下),其首先将它结合到一些小的分区数之前做的partitionBy(COL)似乎是一个坏主意.

任何建议都非常感谢!

apache-spark pyspark

22
推荐指数
1
解决办法
5787
查看次数

使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区

我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy算法在我尝试过的两种方法中都遇到了麻烦。

分区严重偏斜-有些分区很大,有些很小。

问题1

当我之前使用repartition时repartitionBy,Spark将所有分区写为单个文件,即使是大文件也是如此

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。

问题2

当我不使用时repartition,Spark会写出太多文件。

此代码将写出疯狂的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!

当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。

我想要什么

我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。

我最好的前进道路是什么?

partitioning apache-spark apache-spark-sql

6
推荐指数
3
解决办法
631
查看次数