Apr*_*ari 10 apache-spark apache-spark-sql
我想知道,spark.sql.autoBroadcastJoinThreshold即使连接方案使用数据集API连接而不是使用Spark SQL,属性是否可用于在所有工作节点上广播较小的表(同时进行连接).
如果我的大表是250 Gigs而Smaller是20 Gigs,我是否需要设置此配置:spark.sql.autoBroadcastJoinThreshold= 21 Gigs(可能)以便将整个表/发送Dataset到所有工作节点?
示例:
数据集API连接
val result = rawBigger.as("b").join(
broadcast(smaller).as("s"),
rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID),
"left_outer"
)
Run Code Online (Sandbox Code Playgroud)SQL
select *
from rawBigger_table b, smaller_table s
where b.campign_id = s.campaign_id;
Run Code Online (Sandbox Code Playgroud)use*_*411 28
首先,spark.sql.autoBroadcastJoinThreshold和broadcast提示是不同的机制.即使autoBroadcastJoinThreshold被禁用,设置broadcast提示也会优先.使用默认设置:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
Run Code Online (Sandbox Code Playgroud)
String = 10485760
Run Code Online (Sandbox Code Playgroud)
val df1 = spark.range(100)
val df2 = spark.range(100)
Run Code Online (Sandbox Code Playgroud)
Spark将使用autoBroadcastJoinThreshold并自动广播数据:
df1.join(df2, Seq("id")).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)
当我们禁用自动广播Spark时会使用标准SortMergeJoin:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *Range (0, 100, step=1, splits=Some(8))
+- *Sort [id#3L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
Run Code Online (Sandbox Code Playgroud)
但可以强制使用BroadcastHashJoin与broadcast提示:
df1.join(broadcast(df2), Seq("id")).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)
SQL有自己的提示格式(类似于Hive中使用的格式):
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
spark.sql(
"SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
Run Code Online (Sandbox Code Playgroud)
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)
所以回答你的问题 - autoBroadcastJoinThreshold在使用DatasetAPI 时是适用的,但在使用显式broadcast提示时则不相关.
此外,广播大型物体不太可能提供任何性能提升,并且在实践中通常会降低性能并导致稳定性问题.请记住,广播对象必须首先获取驱动程序,然后发送给每个工作程序,最后加载到内存中.
Jac*_*ski 17
只是为了分享更多细节(从代码中)到@ user6910411的精彩答案.
引用源代码(格式化我的):
spark.sql.autoBroadcastJoinThreshold配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位).
通过将此值设置为-1,可以禁用广播.
请注意,当前统计信息仅支持
ANALYZE TABLE COMPUTE STATISTICS noscan已运行命令的Hive Metastore表,以及基于文件的数据源表,其中统计信息直接计算在数据文件上.
spark.sql.autoBroadcastJoinThreshold默认为10M(即10L * 1024 * 1024),Spark将检查要使用的连接(请参阅JoinSelection执行计划策略).
有6种不同的连接选择,其中包括广播(使用BroadcastHashJoinExec或BroadcastNestedLoopJoinExec物理运营商).
BroadcastHashJoinExec 将在加入密钥并且以下其中一个成立时选择:
spark.sql.autoBroadcastJoinThresholdspark.sql.autoBroadcastJoinThresholdBroadcastNestedLoopJoinExec当没有加入密钥和上述条件之一时,将选择BroadcastHashJoinExec.
换句话说,Spark将自动选择正确的连接,包括BroadcastHashJoinExec基于spark.sql.autoBroadcastJoinThreshold属性(以及其他要求)以及连接类型.
| 归档时间: |
|
| 查看次数: |
18491 次 |
| 最近记录: |