如何在唯一键上加入DataFrames时避免shuffle?

Dea*_*nLa 16 apache-spark apache-spark-sql

我有两个DataFrames AB:

  • A(id, info1, info2)大约2亿行的列
  • B只有id100万行的列

id列在两个DataFrame中都是唯一的.

我想要一个新的DataFrame,它过滤A只包含来自的值B.

如果B很小,我知道我会做的事情

A.filter($("id") isin B("id"))
Run Code Online (Sandbox Code Playgroud)

B仍然很大,所以并非所有它都适合作为广播变量.

而且我知道我可以使用

A.join(B, Seq("id"))
Run Code Online (Sandbox Code Playgroud)

但这不会利用这种独特性,我担心会造成不必要的洗牌.

实现该任务的最佳方法是什么?

Man*_*waj 20

关于如何优化连接的默认建议是:

  1. 如果可以,请使用广播联接(根据您的问题,您的表似乎很大,并且不能选择广播联接).Spark中的一个选项是执行广播连接(也就是hadoop世界中的地图侧连接).使用广播连接,您可以通过避免通过网络发送大表的所有数据,非常有效地将大表(事实)与相对较小的表(维度)连接起来.

    在连接运算符中使用时,可以使用广播功能标记要广播的数据集.它使用spark.sql.autoBroadcastJoinThreshold设置来控制在执行连接时将广播到所有工作节点的表的大小.

  2. 使用相同的分区程序.如果两个RDD具有相同的分区,则连接不会导致混洗.但请注意,缺少shuffle并不意味着不必在节点之间移动数据.两个RDD可能具有相同的分区(被共同分区),但是具有位于不同节点上的相应分区(不是共同定位的).这种情况仍然比洗牌更好,但要记住这一点.共址可以提高性能,但很难保证.

  3. 如果数据量很大和/或您的集群无法增长,甚至上面的(2)导致OOM,请使用两遍方法.首先,重新分区数据并使用分区表(dataframe.write.partitionBy())持久化.然后,在循环中串行连接子分区,"追加"到同一个最终结果表.


Arv*_*ula 18

如果您尚未在Dataframe A上应用任何分区程序,可能这将有助于您了解Join和Shuffle概念.

没有分区:

A.join(B, Seq("id"))
Run Code Online (Sandbox Code Playgroud)

默认情况下,此操作将散列两个数据帧的所有键,将具有相同键哈希值的元素通过网络发送到同一台计算机,然后将这些元素与该计算机上的相同键连接在一起.在这里你必须注意到两个数据帧都在网络中混乱. 在此输入图像描述

使用HashPartitioner:在构建A Dataframe时调用partitionBy(),Spark现在知道它是散列分区的,并且对它的join()调用将利用这些信息.特别是,当我们调用A.join(B,Seq("id"))时,Spark将仅对B RDD进行洗牌.由于B的数据少于A,因此您不需要在B上应用分区

例如:

 val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
     .partitionBy(new HashPartitioner(100)) // Create 100 partitions
     .persist()
 A.join(B, Seq("id"))
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

参考资料来自Learning Spark书.

  • “partitionBy”的“成本”怎么样?它不会在 `join` 之前导致 shuffle 如此有效地成本(= shuffle 的数量)将是相同的吗? (4认同)
  • @JacekLaskowski我同意如果将其应用于现有RDD上,则partitionBy将花费您,但是在此处,当您构建Dataframe / RDD本身时,您将应用partitionBy,因此不会花费您额外的费用。 (2认同)
  • 出于好奇,如果你执行`B.join(A, Seq("id"))`,spark是否足够聪明,仍然只能对B RDD进行洗牌? (2认同)

bsh*_*141 5

如果我正确理解了您的问题,则希望使用B在每个节点上复制DataFrame的广播联接,以便半联接计算(即,使用id联接从DataFrame 进行过滤A)可以在每个节点上独立进行计算,而不必传达信息彼此之间来回移动(即,随机连接)。

您可以运行联接函数,这些函数显式调用广播联接以实现您要执行的操作:

import org.apache.spark.sql.functions.broadcast

val joinExpr = A.col("id") === B.col("id")

val filtered_A = A.join(broadcast(B), joinExpr, "left_semi")
Run Code Online (Sandbox Code Playgroud)

您可以运行filtered_A.explain()以验证是否正在使用广播联接。