相关疑难解决方法(0)

如何将RDD,Dataframe或Dataset直接转换为Broadcast变量而不收集?

有没有办法(或任何计划)能够将Spark分布式集合(RDDs DataframeDatasets)直接转换为Broadcast变量而不需要collect?公共API似乎没有"开箱即用"的东西,但是可以在较低级别完成某些事情吗?

我可以想象这些操作有2倍的加速潜力(或更多?).为了解释我的意思,让我们通过一个例子来解释:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)
Run Code Online (Sandbox Code Playgroud)

这会导致所有数据被收集到驱动程序,然后广播数据.这意味着数据通过网络发送两次.

什么会是这样的:

val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)
Run Code Online (Sandbox Code Playgroud)

在这里,Spark可以完全绕过收集数据,只是在节点之间移动数据.

奖金

此外,可能存在类似Monoid的API(有点像combineByKey),对于其中.toMap或者任何操作Array[T]都很昂贵但可以并行完成的情况.例如,构造某些Trie结构可能是昂贵的,这种功能可能导致算法设计的可怕范围.当IO运行时也可以运行此CPU活动 - 当前广播机制正在阻塞(即所有IO,然后是所有CPU,然后是所有IO).

澄清

在这里,连接不是(主要)用例,可以假设我稀疏地使用广播的数据结构.例如,键someOtherRdd不会覆盖键,myUberMap但我不知道我需要哪些键,直到我遍历someOtherRdd并假设我myUberMap多次使用.

我知道所有听起来都有点模糊,但重点是更一般的机器学习算法设计.

scala dataframe apache-spark apache-spark-sql

9
推荐指数
1
解决办法
1808
查看次数

在Spark 1.6中加入数据帧时没有发生广播

下面是我正在运行的示例代码.当这个spark作业运行时,使用sortmergejoin而不是broadcastjoin进行Dataframe连接.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  
Run Code Online (Sandbox Code Playgroud)

即使我在join语句中指定了broadcast()提示,也不会发生broadcastjoin.

优化器对数据帧进行散列分区,导致数据偏斜.

有没有人见过这种行为?

我使用Spark 1.6和HiveContext作为SQLContext在纱线上运行它.火花作业运行在200个执行器上.txnTable的数据大小为240GB,countriesDf的数据大小为5mb.

scala join query-optimization apache-spark apache-spark-sql

5
推荐指数
1
解决办法
5909
查看次数