the*_*evd 0 apache-spark apache-spark-sql
我试图通过禁用 BroadcastHashJoin 和 SortMergeJoin 来强制 Spark 使用 ShuffleHashJoin,但 Spark 始终使用 SortMergeJoin。
我使用的是spark版本2.4.3
object ShuffleHashJoin {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder()
.appName("ShuffleHashJoin")
.master("local[*]")
.getOrCreate()
/*
* Disable auto broadcasting of table and SortMergeJoin
*/
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)
import spark.implicits._
val dataset = Seq(
(0, "playing"),
(1, "with"),
(2, "ShuffledHashJoinExec")).toDF("id", "token")
dataset.join(dataset, Seq("id"), "inner").foreach(_ => ())
// infinite loop to keep the program running to check Spark UI at 4040 port.
while (true) {}
Run Code Online (Sandbox Code Playgroud)
除了设置 Sparkspark.sql.join.preferSortMergeJoin之外false,还必须验证以下内容:(源代码)
canBuildLocalHashMap(right || left)
|-> plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
Run Code Online (Sandbox Code Playgroud)
您已以编程方式设置spark.sql.autoBroadcastJoinThreshold为 0,因此它的计算结果始终为false。
构建哈希映射的成本比排序更高,我们应该只在比其他表小得多的表上构建哈希映射。由于我们没有行数的统计信息,因此使用此处的字节大小作为估计。
muchSmaller(right, left) || muchSmaller(left, right)
|-> a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
Run Code Online (Sandbox Code Playgroud)
在您的示例中,我们需要做几件事才能使其正常工作:
将自动广播阈值更改为某个小值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
至少使连接的一侧x3更大
以及工作示例:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)
import spark.implicits._
val dataset = Seq(
(0, "playing"),
(1, "with"),
(2, "ShuffledHashJoinExec")).toDF("id", "token")
val right = Seq(
(0, "asdfghjklzxcvb"),
(1, "asdfghjklzxcvb"),
(2, "asdfghjklzxcvb"),
(3, "asdfghjklzxcvb"),
(4, "asdfghjklzxcvb"),
(5, "asdfghjklzxcvb"),
(6, "asdfghjklzxcvb"),
(7, "asdfghjklzxcvb"),
(8, "asdfghjklzxcvb"),
(9, "asdfghjklzxcvb"),
)
.toDF("id", "token")
val joined = dataset.join(right, Seq("id"), "inner")
joined.explain(true)
*(1) Project [id#5, token#6, token#15]
+- ShuffledHashJoin [id#5], [id#14], Inner, BuildLeft
:- Exchange hashpartitioning(id#5, 200)
: +- LocalTableScan [id#5, token#6]
+- Exchange hashpartitioning(id#14, 200)
+- LocalTableScan [id#14, token#15]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1237 次 |
| 最近记录: |