pyspark:有效地使partitionBy写入与原始表相同数量的总分区

set*_*127 22 apache-spark pyspark

我有一个与pyspark repartitionBy()函数相关的问题,我最初在这个问题的评论中发布了这个问题.我被要求将其作为一个单独的问题发布,所以这里是:

据我所知,df.partitionBy(COL)将每个值写入所有行COL到他们自己的文件夹,并且每个文件夹将(假设行以前通过其他键分布在所有分区上)具有与之前在文件中大致相同的文件数.整张桌子.我发现这种行为很烦人.如果我有一个包含500个分区的大表,并且我partitionBy(COL)在一些属性列上使用,我现在有100个文件夹,每个文件夹包含500个(现在非常小)文件.

我想要的是partitionBy(COL)行为,但文件大小和文件数量大致相同.

作为演示,上一个问题共享一个玩具示例,其中有一个包含10个分区的表,partitionBy(dayOfWeek)并且现在有70个文件,因为每个文件夹中有10个.我想要~10个文件,每天一个,可能需要2或3天,有更多的数据.

这可以轻松完成吗?喜欢的东西,df.write().repartition(COL).partitionBy(COL)好像它可能工作,但我担心,(在一个非常大的表,该表将被划分为多个文件夹的情况下),其首先将它结合到一些小的分区数之前做的partitionBy(COL)似乎是一个坏主意.

任何建议都非常感谢!

con*_*lee 22

你有几个选择.在我的下面的代码中,我假设你想要在镶木地板上写,但当然你可以改变它.

(1)df.repartition(numPartitions,*cols).write.partitionBy(*cols).parquet(writePath)

这将首先使用基于散列的分区来确保来自COL的有限数量的值进入每个分区.根据您选择的值numPartitions,某些分区可能为空,而其他分区可能拥挤值 - 对于任何不确定原因的人,请阅读此内容.然后,当您调用partitionByDataFrameWriter时,每个分区中的每个唯一值都将放在其自己的单个文件中.

警告:此方法可能导致不平衡的分区大小和不平衡的任务执行时间.当列中的值与许多行相关联时会发生这种情况(例如,城市列 - 纽约市的文件可能有很多行),而其他值则较少(例如,小城镇的值).

(2)df.sort(sortCols).write.parquet(writePath)

当您希望(1)您编写的文件大小几乎相等(2)对写入文件数的精确控制时,此选项非常有用.此方法首先对数据进行全局排序,然后查找将数据拆分为k均匀大小的分区的拆分,其中k在spark配置中指定spark.sql.shuffle.partitions.这意味着具有相同排序键值的所有值彼此相邻,但有时它们将跨越分割,并且位于不同的文件中.如果您的用例要求所有具有相同密钥的行位于同一分区中,则不要使用此方法.

还有两个额外的奖励:(1)通过对数据进行排序,它在磁盘上的大小通常可以减少(例如,按user_id对所有事件进行排序,然后按时间对列值进行大量重复,这有助于压缩)和(2) )如果您写入支持它的文件格式(如Parquet),则后续读者可以通过使用谓词下推最佳地读取数据,因为镶木地板编写器将在元数据中写入每列的MAX和MIN值,允许如果查询指定分区(最小,最大)范围之外的值,则读取器跳过行.

请注意,Spark中的排序比仅重新分区更昂贵,并且需要额外的阶段.在幕后,Spark将首先在一个阶段确定分裂,然后将数据混合到另一个阶段的分裂中.

(3)df.rdd.partitionBy(customPartitioner).toDF().write.parquet(writePath)

如果您在Scala上使用spark,那么您可以编写一个客户分区程序,它可以克服基于散列的分区程序的烦人问题.遗憾的是,pySpark不是一个选项.如果你真的想在pySpark中编写一个自定义分区器,我发现这是可能的,虽然有点尴尬,使用rdd.repartitionAndSortWithinPartitions:

df.rdd \
  .keyBy(sort_key_function) \  # Convert to key-value pairs
  .repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS, 
                                      partitionFunc=part_func) \
  .values() # get rid of keys \
.toDF().write.parquet(writePath)
Run Code Online (Sandbox Code Playgroud)

也许其他人知道在pyspark中使用数据框上的自定义分区程序的更简单方法?

  • 是否可以将数据帧划分为多个 df(每一列的每个值一个 df,更改每个数据集的分区数量并单独写入它们?这样您可以控制每个最终文件夹中的文件数量。 (2认同)
  • 是的@SinanErdem,这绝对是可能的。您的驱动程序将必须循环遍历该列的每个单独值,并为每个值创建一个新的数据框和作业。因此,如果您不确定列中循环的值数量始终是可管理的,那么这是有风险的。 (2认同)
  • @Cal yes 控制(2)中的文件数量,'全局排序方法',你将 `spark.sql.shuffle.partitions` 配置参数设置为你想要的分区数。 (2认同)