Aja*_*axx 10 join apache-spark rdd
我有两个RDD.一个RDD在500万到1000万个条目之间,另一个RDD在5亿到7.5亿个条目之间.在某些时候,我必须使用公共密钥加入这两个rdds.
val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);
Run Code Online (Sandbox Code Playgroud)
当spark决定加入时,它决定做一个ShuffledHashJoin.这导致rddB中的许多项目在网络上进行混洗.同样,一些rddA也在网络上进行了洗牌.在这种情况下,rddA太"大"而不能用作广播变量,但看起来像BroadcastHashJoin会更有效率.是否有提示使用BroadcastHashJoin的火花?(Apache Flink通过联接提示支持此功能).
如果没有,是增加autoBroadcastJoinThreshold的唯一选择吗?
更新7/14
我的表现问题似乎完全依赖于重新分配.通常,从HDFS读取的RDD将按块进行分区,但在这种情况下,源是一个镶木地板数据源[我制作].当spark(databricks)写出镶木地板文件时,它会为每个分区写入一个文件,同样地,它会为每个文件读取一个分区.因此,我发现最好的答案是在数据源的生成过程中,按密钥对其进行分区,然后写出镶木地板(然后自然是共同分区)并将其用作rddB.
给出的答案是正确的,但我认为有关镶木地板数据源的详细信息可能对其他人有用.
Eug*_*nev 21
您可以使用相同的分区程序对RDD进行分区,在这种情况下,具有相同密钥的分区将在同一个执行程序上并置.
在这种情况下,您将避免连接操作的随机播放.
Shuffle只会发生一次,当你更新parititoner时,如果你要缓存RDD的所有连接,那么应该是执行者的本地连接
import org.apache.spark.SparkContext._
class A
class B
val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???
val partitioner = new HashPartitioner(1000)
rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()
Run Code Online (Sandbox Code Playgroud)
您也可以尝试更新广播阈值大小,也许rddA可以广播:
--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb
Run Code Online (Sandbox Code Playgroud)
我们使用400mb进行广播连接,效果很好.
归档时间: |
|
查看次数: |
8393 次 |
最近记录: |