是否存在提供共分区连接的Spark SQL DataSource的任何实现 - 最有可能通过CoGroupRDD?我没有在现有的Spark代码库中看到任何用途.
在两个表具有相同数量和相同范围的分区键的情况下,动机将是大大减少混洗流量:在这种情况下,将存在Mx1而不是MxN shuffle扇出.
目前在Spark SQL中唯一的大规模连接实现似乎是ShuffledHashJoin - 它确实需要MxN shuffle扇出,因此很昂贵.
我需要使用Spark SQL或Dataframe API连接表.需要知道实现它的优化方式.
场景是:
实现这一目标的最佳方法是什么?如果有人遇到类似的问题,请分享您的经验.
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)], numSlices=8)
rdd2 = rdd1.mapValues(lambda x: x)
Run Code Online (Sandbox Code Playgroud)
这些RDD具有相同的分区:
rdd1.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]
rdd2.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]
Run Code Online (Sandbox Code Playgroud)
这里有多个答案,表明加入共分区数据不会导致混乱,这对我来说很有意义.示例:共同分区的RDD的连接是否会导致Apache Spark中的混乱?
但是,当我使用PySpark加入这些共同分区的RDD时,数据被混洗到一个新的分区:
rdd1.join(rdd2).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], [], [], [], [], [], [], [], [], []]
Run Code Online (Sandbox Code Playgroud)
即使我将新分区的数量设置为原始分区8,分区也会更改:
rdd1.join(rdd2, numPartitions=8).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], []]
Run Code Online (Sandbox Code Playgroud)
为什么我不能避免使用这些共同分区的RDD进行洗牌?
我正在使用Spark …
我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:
最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:
df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)
但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。
因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?
PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。
为了减少加入两个 RDD 期间的混洗,我决定首先使用 HashPartitioner 对它们进行分区。这是我如何做到的。我做得对吗,还是有更好的方法来做到这一点?
val rddA = ...
val rddB = ...
val numOfPartitions = rddA.getNumPartitions
val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))
val rddAB = rddApartitioned.join(rddBpartitioned)
Run Code Online (Sandbox Code Playgroud)