在Spark 1.6中加入数据帧时没有发生广播

Pra*_* R. 5 scala join query-optimization apache-spark apache-spark-sql

下面是我正在运行的示例代码.当这个spark作业运行时,使用sortmergejoin而不是broadcastjoin进行Dataframe连接.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  
Run Code Online (Sandbox Code Playgroud)

即使我在join语句中指定了broadcast()提示,也不会发生broadcastjoin.

优化器对数据帧进行散列分区,导致数据偏斜.

有没有人见过这种行为?

我使用Spark 1.6和HiveContext作为SQLContext在纱线上运行它.火花作业运行在200个执行器上.txnTable的数据大小为240GB,countriesDf的数据大小为5mb.

zer*_*323 8

广播方式DataFrame和访问方式都不正确.

  • 标准广播不能用于处理分布式数据结构.如果要在a上执行广播连接,DataFrame则应使用为广播broadcast指定DataFrame的标记功能:

    import org.apache.spark.sql.functions.broadcast
    
    val countriesDf: DataFrame = ???
    val tmp: DataFrame = broadcast(
      countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
    ) 
    
    txnTable.as("df1").join(
      broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
    
    Run Code Online (Sandbox Code Playgroud)

    在内部,它将collect tmp不会从内部和广播转换.

  • 热切评估连接参数.即使可以使用SparkContext.broadcast分布式数据结构,也可以在join调用之前在本地评估广播值.这就是为什么你的功能可以工作但不执行广播连接.