相关疑难解决方法(0)

Spark SQL - df.repartition和DataFrameWriter partitionBy之间的区别?

DataFrame repartition()和DataFrameWriter partitionBy()方法有什么区别?

我希望两者都习惯于"基于数据帧列分区数据"?或者有什么区别?

data-partitioning apache-spark-sql

42
推荐指数
3
解决办法
3万
查看次数

使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区

我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy算法在我尝试过的两种方法中都遇到了麻烦。

分区严重偏斜-有些分区很大,有些很小。

问题1

当我之前使用repartition时repartitionBy,Spark将所有分区写为单个文件,即使是大文件也是如此

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。

问题2

当我不使用时repartition,Spark会写出太多文件。

此代码将写出疯狂的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!

当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。

我想要什么

我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。

我最好的前进道路是什么?

partitioning apache-spark apache-spark-sql

6
推荐指数
3
解决办法
631
查看次数

Spark 按列重新分区,每列具有动态分区数

如何根据列中的项目数对 DataFrame 进行分区。假设我们有一个包含 100 人的 DataFrame(列是first_namecountry),我们想为一个国家的每 10 人创建一个分区。

如果我们的数据集包含来自中国的 80 人、来自法国的 15 人和来自古巴的 5 人,那么我们将需要 8 个中国分区、2 个法国分区和 1 个古巴分区。

这是不起作用的代码:

  • df.repartition($"country"): 这将为中国创建 1 个分区,为法国创建一个分区,为古巴创建一个分区
  • df.repartition(8, $"country", rand):这会为每个国家创建最多 8 个分区,因此应该为中国创建 8 个分区,但法国和古巴分区未知。法国可能在 8 个分区中,而古巴最多可能在 5 个分区中。有关更多详细信息,请参阅此答案

这是repartition()文档:

重新分区文件

当我查看该repartition()方法时,我什至没有看到采用三个参数的方法,因此看起来其中一些行为没有记录。

有没有办法动态设置每列的分区数?这将使创建分区数据集变得更容易。

apache-spark

6
推荐指数
2
解决办法
2万
查看次数