Spark Sort Merge Join 是否涉及洗牌阶段?

Dag*_*ang 4 apache-spark

我对排序合并连接是否涉及排序阶段之前的洗牌阶段感到有点困惑。有的文章说可以,但是为什么不叫Shuffle Sort Merge Join呢,这样和Shuffle Hash Join更一致。

Vin*_*oba 11

TLDR:是的,Spark Sort Merge Join 涉及洗牌阶段。而且我们可以推测它不叫Shuffle Sort Merge Join,因为没有Broadcast Sort Merge Join可以区分。

通过示例了解 Spark Sort Merge Join

Spark 的排序合并连接算法使用 shuffle 在执行器之间分配数据。让我们看一个例子。

所以想象一下你想加入以下datasetA

ID 价值
3 a3
1 a1
4 a4
2 a2

具有以下内容datasetB

ID 价值
2 b2
4 b4
3 b3
1 b1

为此,您有一个位于 2 个执行器上的 Spark 应用程序,并使用排序合并策略。让我们详细说明每个步骤。

1. 根据分区函数对数据进行混洗

想象一下我们使用模 2配分函数。数据将在两个执行器上重新分配,如下所示:

执行者1

执行器 1 获取 id 的值为1模 2 的行,因此 ids13

数据集A
ID 值A
3 a3
1 a1
数据集B
ID 值B
3 b3
1 b1
执行者2

执行器 2 获取 id 的值0模 2 的行,因此 ids24

数据集A
ID 值A
4 a4
2 a2
数据集B
ID 值B
2 b2
4 b4

2. 对每个执行器上的数据集进行排序

执行者1
数据集A
ID 值A
1 a1
3 a3
数据集B
ID 值B
1 b1
3 b3
执行者2
数据集A
ID 值A
2 a2
4 a4
数据集B
ID 值B
2 b2
4 b4

3. 您执行加入

执行者1
ID 值A 值B
1 a1 b1
3 a3 b3
执行者2
ID 值A 值B
2 a2 b2
4 a4 b4

完整的最终数据集

ID 值A 值B
1 a1 b1
3 a3 b3
2 a2 b2
4 a4 b4

因此,当您使用排序合并策略时,首先会打乱数据。

代码参数

如果您查看Spark 的代码,您会看到SortMergeJoinExec执行排序合并连接的类扩展了特征ShuffledJoin。所以代码告诉你,执行Sort Merge Join时有一个shuffle。

那么为什么不叫Shuffled Sort Merge Join呢?

在经典的关系数据库管理系统中,由于一切都在同一执行器/服务器/机器上完成,因此您只需选择连接策略。主要的加盟策略有:

由于 Spark 是一个在多个执行器上运行的分布式计算框架,因此您首先必须将大数据集分割成更小的部分,这些部分可以独立分布在将应用这些连接算法的每个执行器上。为此,您有两种策略:

  • 广播:将最小的数据集复制到每个执行器。由于执行器拥有最小数据集的所有数据,因此它可以将其与最大数据集的部分数据连接起来,而无需依赖其他执行器。当执行器内存中最小的容量时,Spark 会选择此策略。
  • shuffle/repartition:将两个数据集的部分复制到每个执行器。对于每个执行器,您应该拥有两个数据集的正确部分,以防止在执行连接时必须从另一个执行器检索数据。

因此,当您要求 Spark 连接两个数据集时,Spark 需要选择两种策略:如何在执行器之间分配数据(广播或洗牌)以及如何执行实际连接(排序合并连接、哈希连接或嵌套循环连接)。这两种策略的组合给出了 Spark 的连接策略:

  • 广播哈希连接
  • 打乱哈希连接
  • 广播嵌套循环连接
  • 笛卡尔积
  • 排序合并连接

我们可以看到,Hash Join 是唯一结合了 Broadcast 和 Shuffle 两种不同的分发策略的 join 策略。所以我们可以猜测Shuffled添加前缀是为了避免 Hash Join with Broadcast 和 Hash Join with Shuffle 之间的混淆。

所以我们可以想象,由于没有Broadcast Sort Merge Join,所以不需要放置Shuffled前缀,这就是为什么Sort Merge Join策略不称为Shuffled Sort Merge Join。