Spark SQL - df.repartition和DataFrameWriter partitionBy之间的区别?

Sha*_*kar 42 data-partitioning apache-spark-sql

DataFrame repartition()和DataFrameWriter partitionBy()方法有什么区别?

我希望两者都习惯于"基于数据帧列分区数据"?或者有什么区别?

con*_*lee 139

小心:我相信接受的答案是不对的!我很高兴你提出这个问题,因为这些类似命名的函数的行为在重要和意想不到的方面有所不同,这些方法在官方的spark文档中没有详细记录.

接受的答案的第一部分是正确的:调用df.repartition(COL, numPartitions=k)k使用基于散列的分区器创建包含分区的数据帧.COL这里定义了分区键 - 它可以是单个列或列列表.基于散列的分区器获取每个输入行的分区键,k通过类似的东西将其散列到分区空间中partition = hash(partitionKey) % k.这可以保证具有相同分区键的所有行最终位于同一分区中.但是,来自多个分区键的行也可以最终位于同一分区中(当分区键之间发生哈希冲突时),并且某些分区可能为空.

总之,非直观的方面df.repartition(COL, numPartitions=k)是这样的

  • 分区不会严格隔离分区键
  • 您的某些k分区可能为空,而其他分区可能包含多个分区键中的行

行为df.write.partitionBy是完全不同的,以许多用户不会期望的方式.假设您希望输出文件是日期分区的,并且您的数据跨越7天.我们还假设df有10个分区开头.运行时df.write.partitionBy('day'),您应该期望多少输出文件?答案是"这取决于".如果您的起始分区的每个分区都df包含每天的数据,那么答案是70.如果您的每个起始分区都df包含恰好一天的数据,则答案为10.

我们如何解释这种行为?运行时df.write,每个原始分区df都是独立编写的.也就是说,您的原始10个分区中的每个分区在"日期"列上单独进行子分区,并为每个子分区写入单独的文件.

我觉得这种行为很烦人,并且希望有一种方法可以在编写数据帧时进行全局重新分区.

  • +1.只是为了使partitionBy更好的例子更明确,你可以将它想象成:group by partition,col1,col2 ...这将为你提供将要写入的文件数量. (3认同)
  • 对于“官方火花文档中未充分记录的重要和意想不到的差异”,给出了很好的答案,并加了+50。我的问题:有没有办法破解您在最后一句话中描述的内容?像df.write()。repartition(COL).partitionBy(COL)之类的东西?我的目标是`partitionBy()`行为,但是文件大小和文件数量与我原来的大致相同。这可以轻松完成吗?`partitionBy(date)`=> 70个文件示例是相关的。我想要大约10个文件,每天一个,或者可能需要2到3个文件,以便有更多的数据。 (3认同)
  • @seth127 - 我有一些想法,但需要一些空间来解释。将您的问题写为正式问题,我会给您一个答案。 (2认同)
  • @conradlee好的,这里是:/sf/ask/3554310931/ 提前致谢! (2认同)
  • @thebluephantom 假设您有 1000 天的数据,并且您想在日期列上进行分区。所以你运行`df.repartition(df.date, 1000)`。许多人希望每个分区只包含一天的数据。但是,1000 个分区中的一些将是空的,而其他分区将包含多天的数据。许多人发现这不直观(也许您不这么认为,因此会造成混淆)。 (2认同)

Mar*_*usz 26

如果运行repartition(COL),则在计算期间更改分区 - 您将获得spark.sql.shuffle.partitions(默认值:200)分区.如果你然后打电话,.write你会得到一个包含许多文件的目录.

如果您运行.write.partitionBy(COL)结果,那么您将获得与COL中的唯一值一样多的目录.这样可以加速进一步的数据读取(如果您通过分区列进行筛选)并在存储上节省一些空间(从数据文件中删除分区列).

更新:请参阅@conradlee的回答.他详细解释了应用不同方法后目录结构的样子,以及两种情况下的结果数量.


Pow*_*ers 18

repartition()用于对内存中的partitionBy数据进行分区,用于对磁盘上的数据进行分区。它们经常结合使用。

二者repartition()partitionBy可以用于“基于数据帧列分区数据”,但repartition()在存储分区中的数据和partitionBy分区在磁盘上的数据。

重新分区()

让我们玩一些代码来更好地理解分区。假设您有以下 CSV 数据。

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China
Run Code Online (Sandbox Code Playgroud)

df.repartition(col("country")) 将在内存中按国家/地区重新分区数据。

让我们写出数据,以便我们可以检查每个内存分区的内容。

val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
  .write
  .csv(outputPath)
Run Code Online (Sandbox Code Playgroud)

以下是将数据写入磁盘的方式:

partitioned_by_country/
  part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
Run Code Online (Sandbox Code Playgroud)

每个文件包含一个国家的数据 - 该part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv文件包含此中国数据,例如:

Bruce,Lee,China
Jack,Ma,China
Run Code Online (Sandbox Code Playgroud)

partitionBy()

让我们将数据写入磁盘,partitionBy看看文件系统输出有何不同。

这是将数据写出到磁盘分区的代码。

val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
  .write
  .partitionBy("country")
  .csv(outputPath)
Run Code Online (Sandbox Code Playgroud)

以下是磁盘上的数据:

partitionedBy_disk/
  country=Argentina/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
  country=China/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
  country=Russia/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
Run Code Online (Sandbox Code Playgroud)

为什么要在磁盘上分区数据?

对磁盘上的数据进行分区可以使某些查询运行得更快。