DataFrame连接优化 - 广播哈希加入

NNa*_*med 32 dataframe apache-spark apache-spark-sql apache-spark-1.4

我试图有效地连接两个DataFrame,其中一个是大的,第二个是小一点.

有没有办法避免这一切洗牌?我无法设置autoBroadCastJoinThreshold,因为它只支持整数 - 我尝试广播的表略大于整数个字节.

有没有办法迫使广播忽略这个变量?

Ram*_*ram 68

广播哈希联接(类似于Mapreduce中的地图侧联接或地图侧联合):

在SparkSQL中,您可以看到通过调用执行的连接类型queryExecution.executedPlan.与核心Spark一样,如果其中一个表比另一个表小得多,则可能需要广播散列连接.您可以通过在加入之前调用方法来提示Spark SQL应该广播给定的DF broadcastDataFrame进行连接

例: largedataframe.join(broadcast(smalldataframe), "key")

在DWH术语中,大数据帧可能就像事实一样,
smalldataframe可能就像维度一样

正如我喜欢的书(HPS)所描述的那样.看下面有更好的理解.. 在此输入图像描述

注意:以上broadcast来自import org.apache.spark.sql.functions.broadcast不是SparkContext

Spark也会自动使用它spark.sql.conf.autoBroadcastJoinThreshold来确定是否应该广播一个表.

提示:请参阅DataFrame.explain()方法

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
Run Code Online (Sandbox Code Playgroud)

有没有办法迫使广播忽略这个变量?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


注意 :

另一个类似开箱即用的注意事项wrt Hive(不是火花):使用MAPJOIN下面的hive提示可以实现类似的事情......

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
Run Code Online (Sandbox Code Playgroud)


Seb*_*Piu 18

您可以通过使用提示广播数据帧 left.join(broadcast(right), ...)

  • 有机会提示广播连接到 SQL 语句吗? (3认同)
  • 这次广播的正确导入是什么?我发现这个符号“广播”不可解析。 (2认同)
  • 它位于org.apache.spark.sql.functions下,你需要spark 1.5.0或更新版本 (2认同)

小智 7

设置spark.sql.autoBroadcastJoinThreshold = -1将完全禁用广播。请参阅 Spark SQL、DataFrames 和数据集指南中的其他配置选项


dpe*_*ock 3

这是 Spark 的电流限制,请参阅SPARK-6235。2GB 限制也适用于广播变量。

您确定没有其他好的方法可以做到这一点,例如不同的分区?

否则,您可以通过手动创建多个广播变量(每个变量<2GB)来绕过它。