Pra*_* R. 5 scala join query-optimization apache-spark apache-spark-sql
下面是我正在运行的示例代码.当这个spark作业运行时,使用sortmergejoin而不是broadcastjoin进行Dataframe连接.
def joinedDf (sqlContext: SQLContext,
txnTable: DataFrame,
countriesDfBroadcast: Broadcast[DataFrame]):
DataFrame = {
txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
$"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
}
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
Run Code Online (Sandbox Code Playgroud)
即使我在join语句中指定了broadcast()提示,也不会发生broadcastjoin.
优化器对数据帧进行散列分区,导致数据偏斜.
有没有人见过这种行为?
我使用Spark 1.6和HiveContext作为SQLContext在纱线上运行它.火花作业运行在200个执行器上.txnTable的数据大小为240GB,countriesDf的数据大小为5mb.
广播方式DataFrame
和访问方式都不正确.
标准广播不能用于处理分布式数据结构.如果要在a上执行广播连接,DataFrame
则应使用为广播broadcast
指定DataFrame
的标记功能:
import org.apache.spark.sql.functions.broadcast
val countriesDf: DataFrame = ???
val tmp: DataFrame = broadcast(
countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
)
txnTable.as("df1").join(
broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
Run Code Online (Sandbox Code Playgroud)
在内部,它将collect
tmp
不会从内部和广播转换.
热切评估连接参数.即使可以使用SparkContext.broadcast
分布式数据结构,也可以在join
调用之前在本地评估广播值.这就是为什么你的功能可以工作但不执行广播连接.
归档时间: |
|
查看次数: |
5909 次 |
最近记录: |