spark.sql.autoBroadcastJoinThreshold是否可以使用数据集的连接运算符进行连接?

Apr*_*ari 10 apache-spark apache-spark-sql

我想知道,spark.sql.autoBroadcastJoinThreshold即使连接方案使用数据集API连接而不是使用Spark SQL,属性是否可用于在所有工作节点上广播较小的表(同时进行连接).

如果我的大表是250 Gigs而Smaller是20 Gigs,我是否需要设置此配置:spark.sql.autoBroadcastJoinThreshold= 21 Gigs(可能)以便将整个表/发送Dataset到所有工作节点?

示例:

use*_*411 28

首先,spark.sql.autoBroadcastJoinThresholdbroadcast提示是不同的机制.即使autoBroadcastJoinThreshold被禁用,设置broadcast提示也会优先.使用默认设置:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
Run Code Online (Sandbox Code Playgroud)
String = 10485760
Run Code Online (Sandbox Code Playgroud)
val df1 = spark.range(100)
val df2 = spark.range(100)
Run Code Online (Sandbox Code Playgroud)

Spark将使用autoBroadcastJoinThreshold并自动广播数据:

df1.join(df2, Seq("id")).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

当我们禁用自动广播Spark时会使用标准SortMergeJoin:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
Run Code Online (Sandbox Code Playgroud)

但可以强制使用BroadcastHashJoinbroadcast提示:

df1.join(broadcast(df2), Seq("id")).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

SQL有自己的提示格式(类似于Hive中使用的格式):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)

所以回答你的问题 - autoBroadcastJoinThreshold在使用DatasetAPI 时是适用的,但在使用显式broadcast提示时则不相关.

此外,广播大型物体不太可能提供任何性能提升,并且在实践中通常会降低性能并导致稳定性问题.请记住,广播对象必须首先获取驱动程序,然后发送给每个工作程序,最后加载到内存中.


Jac*_*ski 17

只是为了分享更多细节(从代码中)到@ user6910411的精彩答案.


引用源代码(格式化我的):

spark.sql.autoBroadcastJoinThreshold配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位).

通过将此值设置为-1,可以禁用广播.

请注意,当前统计信息仅支持ANALYZE TABLE COMPUTE STATISTICS noscan已运行命令的Hive Metastore表,以及基于文件的数据源表,其中统计信息直接计算在数据文件上.

spark.sql.autoBroadcastJoinThreshold默认为10M(即10L * 1024 * 1024),Spark将检查要使用的连接(请参阅JoinSelection执行计划策略).

6种不同的连接选择,其中包括广播(使用BroadcastHashJoinExecBroadcastNestedLoopJoinExec物理运营商).

BroadcastHashJoinExec 将在加入密钥并且以下其中一个成立时选择:

  • 加入是CROSS,INNER,LEFT ANTI,LEFT OUTER,LEFT SEMI和右边加入方之一可以广播,即大小小于 spark.sql.autoBroadcastJoinThreshold
  • 加入是CROSS,INNER和RIGHT OUTER之一,左边加入方可以播放,即大小小于 spark.sql.autoBroadcastJoinThreshold

BroadcastNestedLoopJoinExec没有加入密钥和上述条件之一时,将选择BroadcastHashJoinExec.

换句话说,Spark将自动选择正确的连接,包括BroadcastHashJoinExec基于spark.sql.autoBroadcastJoinThreshold属性(以及其他要求)以及连接类型.

  • 当然!在这里:https://issues.apache.org/jira/browse/SPARK-26214 很高兴让 Spark 变得更好 (2认同)
  • “无连接键”是指您的查询未在 SQL 中使用“ON”或“Dataset.join”运算符中没有键。说得通?继续询问,直到您满意答案为止。我最终可能会再次查看来源,以确保我已经得到了正确的:) (2认同)