Dea*_*nLa 16 apache-spark apache-spark-sql
我有两个DataFrames A和B:
A有(id, info1, info2)大约2亿行的列B只有id100万行的列该id列在两个DataFrame中都是唯一的.
我想要一个新的DataFrame,它过滤A只包含来自的值B.
如果B很小,我知道我会做的事情
A.filter($("id") isin B("id"))
Run Code Online (Sandbox Code Playgroud)
但B仍然很大,所以并非所有它都适合作为广播变量.
而且我知道我可以使用
A.join(B, Seq("id"))
Run Code Online (Sandbox Code Playgroud)
但这不会利用这种独特性,我担心会造成不必要的洗牌.
实现该任务的最佳方法是什么?
Man*_*waj 20
关于如何优化连接的默认建议是:
如果可以,请使用广播联接(根据您的问题,您的表似乎很大,并且不能选择广播联接).Spark中的一个选项是执行广播连接(也就是hadoop世界中的地图侧连接).使用广播连接,您可以通过避免通过网络发送大表的所有数据,非常有效地将大表(事实)与相对较小的表(维度)连接起来.
在连接运算符中使用时,可以使用广播功能标记要广播的数据集.它使用spark.sql.autoBroadcastJoinThreshold设置来控制在执行连接时将广播到所有工作节点的表的大小.
使用相同的分区程序.如果两个RDD具有相同的分区,则连接不会导致混洗.但请注意,缺少shuffle并不意味着不必在节点之间移动数据.两个RDD可能具有相同的分区(被共同分区),但是具有位于不同节点上的相应分区(不是共同定位的).这种情况仍然比洗牌更好,但要记住这一点.共址可以提高性能,但很难保证.
如果数据量很大和/或您的集群无法增长,甚至上面的(2)导致OOM,请使用两遍方法.首先,重新分区数据并使用分区表(dataframe.write.partitionBy())持久化.然后,在循环中串行连接子分区,"追加"到同一个最终结果表.
Arv*_*ula 18
如果您尚未在Dataframe A上应用任何分区程序,可能这将有助于您了解Join和Shuffle概念.
没有分区:
A.join(B, Seq("id"))
Run Code Online (Sandbox Code Playgroud)
默认情况下,此操作将散列两个数据帧的所有键,将具有相同键哈希值的元素通过网络发送到同一台计算机,然后将这些元素与该计算机上的相同键连接在一起.在这里你必须注意到两个数据帧都在网络中混乱.

使用HashPartitioner:在构建A Dataframe时调用partitionBy(),Spark现在知道它是散列分区的,并且对它的join()调用将利用这些信息.特别是,当我们调用A.join(B,Seq("id"))时,Spark将仅对B RDD进行洗牌.由于B的数据少于A,因此您不需要在B上应用分区
例如:
val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions
.persist()
A.join(B, Seq("id"))
Run Code Online (Sandbox Code Playgroud)
参考资料来自Learning Spark书.
如果我正确理解了您的问题,则希望使用B在每个节点上复制DataFrame的广播联接,以便半联接计算(即,使用id联接从DataFrame 进行过滤A)可以在每个节点上独立进行计算,而不必传达信息彼此之间来回移动(即,随机连接)。
您可以运行联接函数,这些函数显式调用广播联接以实现您要执行的操作:
import org.apache.spark.sql.functions.broadcast
val joinExpr = A.col("id") === B.col("id")
val filtered_A = A.join(broadcast(B), joinExpr, "left_semi")
Run Code Online (Sandbox Code Playgroud)
您可以运行filtered_A.explain()以验证是否正在使用广播联接。