Sha*_*kar 42 data-partitioning apache-spark-sql
DataFrame repartition()和DataFrameWriter partitionBy()方法有什么区别?
我希望两者都习惯于"基于数据帧列分区数据"?或者有什么区别?
con*_*lee 139
小心:我相信接受的答案是不对的!我很高兴你提出这个问题,因为这些类似命名的函数的行为在重要和意想不到的方面有所不同,这些方法在官方的spark文档中没有详细记录.
接受的答案的第一部分是正确的:调用df.repartition(COL, numPartitions=k)将k使用基于散列的分区器创建包含分区的数据帧.COL这里定义了分区键 - 它可以是单个列或列列表.基于散列的分区器获取每个输入行的分区键,k通过类似的东西将其散列到分区空间中partition = hash(partitionKey) % k.这可以保证具有相同分区键的所有行最终位于同一分区中.但是,来自多个分区键的行也可以最终位于同一分区中(当分区键之间发生哈希冲突时),并且某些分区可能为空.
总之,非直观的方面df.repartition(COL, numPartitions=k)是这样的
k分区可能为空,而其他分区可能包含多个分区键中的行行为df.write.partitionBy是完全不同的,以许多用户不会期望的方式.假设您希望输出文件是日期分区的,并且您的数据跨越7天.我们还假设df有10个分区开头.运行时df.write.partitionBy('day'),您应该期望多少输出文件?答案是"这取决于".如果您的起始分区的每个分区都df包含每天的数据,那么答案是70.如果您的每个起始分区都df包含恰好一天的数据,则答案为10.
我们如何解释这种行为?运行时df.write,每个原始分区df都是独立编写的.也就是说,您的原始10个分区中的每个分区在"日期"列上单独进行子分区,并为每个子分区写入单独的文件.
我觉得这种行为很烦人,并且希望有一种方法可以在编写数据帧时进行全局重新分区.
Mar*_*usz 26
如果运行repartition(COL),则在计算期间更改分区 - 您将获得spark.sql.shuffle.partitions(默认值:200)分区.如果你然后打电话,.write你会得到一个包含许多文件的目录.
如果您运行.write.partitionBy(COL)结果,那么您将获得与COL中的唯一值一样多的目录.这样可以加速进一步的数据读取(如果您通过分区列进行筛选)并在存储上节省一些空间(从数据文件中删除分区列).
更新:请参阅@conradlee的回答.他详细解释了应用不同方法后目录结构的样子,以及两种情况下的结果数量.
Pow*_*ers 18
repartition()用于对内存中的partitionBy数据进行分区,用于对磁盘上的数据进行分区。它们经常结合使用。
二者repartition()并partitionBy可以用于“基于数据帧列分区数据”,但repartition()在存储分区中的数据和partitionBy分区在磁盘上的数据。
重新分区()
让我们玩一些代码来更好地理解分区。假设您有以下 CSV 数据。
first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China
Run Code Online (Sandbox Code Playgroud)
df.repartition(col("country")) 将在内存中按国家/地区重新分区数据。
让我们写出数据,以便我们可以检查每个内存分区的内容。
val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
.write
.csv(outputPath)
Run Code Online (Sandbox Code Playgroud)
以下是将数据写入磁盘的方式:
partitioned_by_country/
part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
Run Code Online (Sandbox Code Playgroud)
每个文件包含一个国家的数据 - 该part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv文件包含此中国数据,例如:
Bruce,Lee,China
Jack,Ma,China
Run Code Online (Sandbox Code Playgroud)
partitionBy()
让我们将数据写入磁盘,partitionBy看看文件系统输出有何不同。
这是将数据写出到磁盘分区的代码。
val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
.write
.partitionBy("country")
.csv(outputPath)
Run Code Online (Sandbox Code Playgroud)
以下是磁盘上的数据:
partitionedBy_disk/
country=Argentina/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
country=China/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
country=Russia/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
Run Code Online (Sandbox Code Playgroud)
为什么要在磁盘上分区数据?
对磁盘上的数据进行分区可以使某些查询运行得更快。
| 归档时间: |
|
| 查看次数: |
29818 次 |
| 最近记录: |