Kan*_*est 5 scala apache-spark apache-spark-sql
如何在Spark中为Dataframe强制完全外部联接以使用Boradcast Hash Join?这是代码片段:
sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)
Run Code Online (Sandbox Code Playgroud)
我的SmallTable的大小比autoBroadcastJoinThreshold上面指定的小得多。另外,如果我使用inner,left_outer或right_outerjoin,则从DAG可视化中可以看到该join正在BroadcastHashJoin按预期使用。
但是,当我使用“ outer”作为联接类型时,spark SortMergeJoin出于某种未知原因决定使用该联接。有谁知道如何解决这个问题?根据我看到的左外部联接的性能,BroadcastHashJoin将有助于加快我的应用程序的运行速度。
\n\n\n由于某些未知原因,spark 决定使用 SortMergeJoin。\n 有人知道如何解决这个问题吗?
\n
原因: FullOuter(指任何关键字outer,full,fullouter)不支持广播哈希连接(又名地图侧连接)
如何证明这一点?
\n\n让我们举一个例子:
\n\n\n包 com.examples\n\n导入 org.apache.log4j.{Level, Logger}\n导入 org.apache.spark.internal.Logging\n导入 org.apache.spark.sql.SparkSession\n导入 org.apache.spark. sql.functions._\n\n/**\n * 使用示例数据连接示例和一些基础演示。\n *\n * @author : Ram Ghadiyaram\n */\nobject JoinExamples extends Logging {\n //关闭不需要的日志\n Logger.getLogger("org").setLevel(Level.OFF)\n val Spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate;\n案例类 Person(名称: String, 年龄: Int, personid: Int)\n\n 案例类 Profile(名称: String, personId: Int, profileDescription: String)\n\n /**\n * main\n * \n * @param args Array[String]\n */\n def main(args: Array[String]): Unit = {\n spark.conf.set("spark.sql.join.preferSortMergeJoin", "false ")\n import Spark.implicits._\n\n spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString()))\n /**\n * 使用案例类在此处创建 2 个数据帧,一个是 Person df1,另一个是 profile df2\n */\n val df1 = Spark.sqlContext.createDataFrame(\n spark.sparkContext.parallelize(\n Person("Sarath", 33) , 2)\n :: Person("KangarooWest", 30, 2)\n :: Person("Ravikumar Ramasamy", 34, 5)\n :: Person("Ram Ghadiyaram", 42, 9)\n : : Person("Ravi chandra Kancharla", 43, 9)\n :: Nil))\n\n\n val df2 = Spark.sqlContext.createDataFrame(\n Profile("Spark", 2, "SparkSQLMaster")\ n :: Profile("Spark", 5, "SparkGuru")\n :: Profile("Spark", 9, "DevHunter")\n :: Nil\n )\n\n // 你可以使用别名使用别名引用列名以提高可读性\n\n val df_asPerson = df1.as("dfperson")\n val df_asProfile = df2.as("dfprofile")\n /** *\n * 示例显示如何加入它们在数据帧级别\n * 下一个示例演示如何使用 sql 和 createOrReplaceTempView\n */\n val join_df = df_asPerson.join(\n Broadcast(df_asProfile)\n , col("dfperson.personid") === col( "dfprofile.personid")\n , "outer")\n val join = join_df.select(\n col("dfperson.name")\n , col("dfperson.age")\n , col("dfprofile .name")\n , col("dfprofile.profileDescription"))\n join.explain(false) // 它将显示使用了哪个连接\n join.show\n\n }\n\n}\n\n\n我尝试使用广播提示进行fullouter连接,但框架被忽略,SortMergeJoin下面是对此的解释计划。\n结果:
\n== 物理计划 ==\n*项目 [name#4、age#5、name#11、profileDescription#13]\n+- SortMergeJoin [personid#6]、[personid#12]、FullOuter\n :- *排序 [personid#6 ASC NULLS FIRST], false, 0\n : +- Exchange hashpartitioning(personid#6, 200)\n : +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String) 、 StringType、fromString、assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4、assertnotnull(input[0, com.examples.JoinExamples$Person, true])。年龄 AS 年龄#5,assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6]\n : +- 扫描ExternalRDDScan[obj#3]\n +- *排序 [personid #12 ASC NULL FIRST], false, 0\n +- Exchange hashpartitioning(personid#12, 200)\n +- LocalTableScan [name#11, personId#12, profileDescription#13]\n+------- -------------+---+-----+-----------------+\n| 姓名|年龄| 名称|个人资料描述|\n+--------------------+---+-----+---------------- -----+\n| 拉维库马尔·拉玛萨米| 34|火花| SparkGuru|\n| 拉姆·加迪亚拉姆| 42|火花| DevHunter|\n|Ravi chandra Kanc...| 43|火花| 开发猎人|\n| 萨拉特| 33|火花| SparkSQLMaster|\n| 袋鼠西| 30|火花| SparkSQLMaster|\n+--------------------+---+-----+--------------- ---+\n\n\n
\n\n\n从 Spark 2.3 开始,归并排序连接是 Spark 中的默认连接算法。\n 但是,可以通过使用内部参数\n \xe2\x80\x98spark.sql.join.preferSortMergeJoin\xe2\x80\x99 来关闭此算法,其中默认情况下为 true。
\n
除 join之外的其他情况fullouter...如果您不希望 Spark 在任何情况下使用 sortmergejoin,您可以设置以下属性。
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")\nRun Code Online (Sandbox Code Playgroud)\n\n这是您不想使用的代码指令SparkStrategies.scala(负责将逻辑计划转换为零个或多个 SparkPlan)sortmergejoin。
此属性为 true 时,通过此PREFER_SORTMERGEJOINspark.sql.join.preferSortMergeJoin属性更喜欢排序合并连接而不是随机散列连接。
设置false意味着spark不能只选择broadcasthashjoin,它也可以是其他任何东西(例如shuffle hash join)。
下面的文档位于SparkStrategies.scalaie 之上object JoinSelection extends Strategy with PredicateHelper ...
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] 阈值\n 或者如果该侧具有显式广播提示(例如,用户应用了\n [[ org.apache.spark.sql.functions.broadcast()] ] 函数到 a DataFrame),那么连接的这一侧将被广播,而另一侧将被流式传输,而不执行随机播放。如果连接双方都有资格广播,则Shuffle hash join:如果单个分区的平均大小足够小,可以构建哈希表。
排序合并:如果匹配的连接键是可排序的。
| 归档时间: |
|
| 查看次数: |
2087 次 |
| 最近记录: |