Spark SQL广播散列连接

use*_*848 8 apache-spark apache-spark-sql

我正在尝试使用SparkSQL在数据帧上执行广播散列连接,如下所示:https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05% 20BroadcastHashJoin%20-%20scala.html

在该示例中,(small)DataFrame通过saveAsTable持久化,然后通过spark SQL(即via)进行连接sqlContext.sql("..."))

我遇到的问题是我需要使用sparkSQL API来构造我的SQL(我还要加入~50个带有ID列表的表,并且不想手工编写SQL).

How do I tell spark to use the broadcast hash join via the API?  The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.
Run Code Online (Sandbox Code Playgroud)

zer*_*323 23

您可以明确地将其标记DataFrame为足够小以便使用broadcast函数进行广播:

Python:

from pyspark.sql.functions import broadcast

small_df = ...
large_df = ...

large_df.join(broadcast(small_df), ["foo"])
Run Code Online (Sandbox Code Playgroud)

或广播提示(Spark> = 2.2):

large_df.join(small_df.hint("broadcast"), ["foo"])
Run Code Online (Sandbox Code Playgroud)

斯卡拉:

import org.apache.spark.sql.functions.broadcast

val smallDF: DataFrame = ???
val largeDF: DataFrame = ???

largeDF.join(broadcast(smallDF), Seq("foo"))
Run Code Online (Sandbox Code Playgroud)

或广播提示(Spark> = 2.2):

largeDF.join(smallDF.hint("broadcast"), Seq("foo"))
Run Code Online (Sandbox Code Playgroud)

SQL

您可以使用提示(Spark> = 2.2):

SELECT /*+ MAPJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo
Run Code Online (Sandbox Code Playgroud)

要么

SELECT /*+  BROADCASTJOIN(small) */ * 
FROM large JOIN small
ON large.foo = small.foo
Run Code Online (Sandbox Code Playgroud)

要么

SELECT /*+ BROADCAST(small) */ * 
FROM large JOIN small
ON larger.foo = small.foo
Run Code Online (Sandbox Code Playgroud)

R(SparkR):

使用hint(Spark> = 2.2):

join(large, hint(small, "broadcast"), large$foo == small$foo)
Run Code Online (Sandbox Code Playgroud)

broadcast(Spark> = 2.3)

join(large, broadcast(small), large$foo == small$foo)
Run Code Online (Sandbox Code Playgroud)

注意:

如果其中一个结构相对较小,则广播连接很有用.否则它可能比完全洗牌要贵得多.

  • 谢谢!通过一些实验,我看到`smallDF.join(largeDF)`不进行广播散列连接,但是`largeDF.join(smallDF)`有. (2认同)

Vam*_*ddy 5

jon_rdd = sqlContext.sql( "select * from people_in_india  p
                            join states s
                            on p.state = s.name")


jon_rdd.toDebugString() / join_rdd.explain() : 
Run Code Online (Sandbox Code Playgroud)

shuffledHashJoin:
对于每个州,印度的所有数据都将被改组成只有29个密钥.问题:不均匀的分片.与29个输出分区有限的并行性.

broadcaseHashJoin:

将小RDD广播到所有工作节点.大型rdd的并行性仍然保持不变,甚至不需要洗牌.在此输入图像描述

PS:图像可能很丑,但信息量很大.