use*_*848 8 apache-spark apache-spark-sql
我正在尝试使用SparkSQL在数据帧上执行广播散列连接,如下所示:https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05% 20BroadcastHashJoin%20-%20scala.html
在该示例中,(small)DataFrame通过saveAsTable持久化,然后通过spark SQL(即via)进行连接sqlContext.sql("..."))
我遇到的问题是我需要使用sparkSQL API来构造我的SQL(我还要加入~50个带有ID列表的表,并且不想手工编写SQL).
How do I tell spark to use the broadcast hash join via the API? The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.
Run Code Online (Sandbox Code Playgroud)
zer*_*323 23
您可以明确地将其标记DataFrame为足够小以便使用broadcast函数进行广播:
Python:
from pyspark.sql.functions import broadcast
small_df = ...
large_df = ...
large_df.join(broadcast(small_df), ["foo"])
Run Code Online (Sandbox Code Playgroud)
或广播提示(Spark> = 2.2):
large_df.join(small_df.hint("broadcast"), ["foo"])
Run Code Online (Sandbox Code Playgroud)
斯卡拉:
import org.apache.spark.sql.functions.broadcast
val smallDF: DataFrame = ???
val largeDF: DataFrame = ???
largeDF.join(broadcast(smallDF), Seq("foo"))
Run Code Online (Sandbox Code Playgroud)
或广播提示(Spark> = 2.2):
largeDF.join(smallDF.hint("broadcast"), Seq("foo"))
Run Code Online (Sandbox Code Playgroud)
SQL
您可以使用提示(Spark> = 2.2):
SELECT /*+ MAPJOIN(small) */ *
FROM large JOIN small
ON large.foo = small.foo
Run Code Online (Sandbox Code Playgroud)
要么
SELECT /*+ BROADCASTJOIN(small) */ *
FROM large JOIN small
ON large.foo = small.foo
Run Code Online (Sandbox Code Playgroud)
要么
SELECT /*+ BROADCAST(small) */ *
FROM large JOIN small
ON larger.foo = small.foo
Run Code Online (Sandbox Code Playgroud)
R(SparkR):
使用hint(Spark> = 2.2):
join(large, hint(small, "broadcast"), large$foo == small$foo)
Run Code Online (Sandbox Code Playgroud)
用broadcast(Spark> = 2.3)
join(large, broadcast(small), large$foo == small$foo)
Run Code Online (Sandbox Code Playgroud)
注意:
如果其中一个结构相对较小,则广播连接很有用.否则它可能比完全洗牌要贵得多.
jon_rdd = sqlContext.sql( "select * from people_in_india p
join states s
on p.state = s.name")
jon_rdd.toDebugString() / join_rdd.explain() :
Run Code Online (Sandbox Code Playgroud)
shuffledHashJoin:
对于每个州,印度的所有数据都将被改组成只有29个密钥.问题:不均匀的分片.与29个输出分区有限的并行性.
broadcaseHashJoin:
将小RDD广播到所有工作节点.大型rdd的并行性仍然保持不变,甚至不需要洗牌.
PS:图像可能很丑,但信息量很大.