如何有效地将大型rdd连接到火花中的大型rdd?

Aja*_*axx 10 join apache-spark rdd

我有两个RDD.一个RDD在500万到1000万个条目之间,另一个RDD在5亿到7.5亿个条目之间.在某些时候,我必须使用公共密钥加入这两个rdds.

val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);
Run Code Online (Sandbox Code Playgroud)

当spark决定加入时,它决定做一个ShuffledHashJoin.这导致rddB中的许多项目在网络上进行混洗.同样,一些rddA也在网络上进行了洗牌.在这种情况下,rddA太"大"而不能用作广播变量,但看起来像BroadcastHashJoin会更有效率.是否有提示使用BroadcastHashJoin的火花?(Apache Flink通过联接提示支持此功能).

如果没有,是增加autoBroadcastJoinThreshold的唯一选择吗?

更新7/14

我的表现问题似乎完全依赖于重新分配.通常,从HDFS读取的RDD将按块进行分区,但在这种情况下,源是一个镶木地板数据源[我制作].当spark(databricks)写出镶木地板文件时,它会为每个分区写入一个文件,同样地,它会为每个文件读取一个分区.因此,我发现最好的答案是在数据源的生成过程中,按密钥对其进行分区,然后写出镶木地板(然后自然是共同分区)并将其用作rddB.

给出的答案是正确的,但我认为有关镶木地板数据源的详细信息可能对其他人有用.

Eug*_*nev 21

您可以使用相同的分区程序对RDD进行分区,在这种情况下,具有相同密钥的分区将在同一个执行程序上并置.

在这种情况下,您将避免连接操作的随机播放.

Shuffle只会发生一次,当你更新parititoner时,如果你要缓存RDD的所有连接,那么应该是执行者的本地连接

import org.apache.spark.SparkContext._

class A
class B

val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???

val partitioner = new HashPartitioner(1000)

rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()
Run Code Online (Sandbox Code Playgroud)

您也可以尝试更新广播阈值大小,也许rddA可以广播:

--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb
Run Code Online (Sandbox Code Playgroud)

我们使用400mb进行广播连接,效果很好.

  • 我理解--conf spark.sql.autoBroadcastJoinThreshold仅适用于Dataframes或Datasets(Spark SQL)之间的连接.它是否也用于RDD连接?谢谢. (2认同)