sortWithinPartitions 是如何排序的?

Eug*_*Cuz 5 columnsorting snappy apache-spark orc

将 sortWithinPartitions 应用于 df 并将输出写入表后,我得到了一个结果,但我不知道如何解释。

df
.select($"type", $"id", $"time")
.sortWithinPartitions($"type", $"id", $"time")
Run Code Online (Sandbox Code Playgroud)

结果文件看起来有点像

1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4
Run Code Online (Sandbox Code Playgroud)

它实际上不是随机的,但也不像我期望的那样排序。即,首先按类型,然后是 id,然后是时间。如果我尝试在排序之前使用重新分区,那么我会得到我想要的结果。但由于某种原因,文件的重量增加了 5 倍(100GB 与 20GB)。

我正在向 hive orc 表写入数据,并将压缩设置为 snappy。

有谁知道为什么它是这样排序的,以及为什么重新分区会得到正确的顺序,但尺寸更大?

使用火花2.2。

wer*_*ner 16

sortWithinPartition的文档说明

返回一个新的数据集,其中每个分区均按给定表达式排序

考虑此函数的最简单方法是想象用作主要排序标准的第四列(分区 ID)。函数spark_partition_id()打印分区。

例如,如果您只有一个大分区(作为 Spark 用户,您永远不会这样做!),sortWithinPartition则按正常排序工作:

df.repartition(1)
  .sortWithinPartitions("type","id","time")
  .withColumn("partition", spark_partition_id())
  .show();
Run Code Online (Sandbox Code Playgroud)

印刷

+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
|   1|  a|   5|        0|
|   1|  a|   6|        0|
|   1|  a|   7|        0|
|   1|  a|   8|        0|
|   2|  b|   1|        0|
|   2|  b|   2|        0|
|   2|  b|   3|        0|
|   2|  b|   4|        0|
+----+---+----+---------+
Run Code Online (Sandbox Code Playgroud)

如果有更多分区,则结果仅在每个分区内排序:

df.repartition(4)
  .sortWithinPartitions("type","id","time")
  .withColumn("partition", spark_partition_id())
  .show();
Run Code Online (Sandbox Code Playgroud)

印刷

+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
|   2|  b|   1|        0|
|   2|  b|   3|        0|
|   1|  a|   5|        1|
|   1|  a|   6|        1|
|   1|  a|   8|        2|
|   2|  b|   2|        2|
|   1|  a|   7|        3|
|   2|  b|   4|        3|
+----+---+----+---------+
Run Code Online (Sandbox Code Playgroud)

为什么要使用sortWithPartition而不是排序sortWithPartition不会触发shuffle,因为数据仅在执行器内移动。sort但是会触发洗牌。因此sortWithPartition执行速度更快。如果数据按有意义的列进行分区,则在每个分区内进行排序可能就足够了。