相关疑难解决方法(0)

spark SQL中的共同分区连接

是否存在提供共分区连接的Spark SQL DataSource的任何实现 - 最有可能通过CoGroupRDD?我没有在现有的Spark代码库中看到任何用途.

在两个表具有相同数量和相同范围的分区键的情况下,动机将是大大减少混洗流量:在这种情况下,将存在Mx1而不是MxN shuffle扇出.

目前在Spark SQL中唯一的大规模连接实现似乎是ShuffledHashJoin - 它确实需要MxN shuffle扇出,因此很昂贵.

apache-spark apache-spark-sql

14
推荐指数
1
解决办法
2859
查看次数

什么是在Spark SQL中连接大表的优化方法

我需要使用Spark SQL或Dataframe API连接表.需要知道实现它的优化方式.

场景是:

  1. 所有数据都以ORC格式存在于Hive中(基本数据帧和参考文件).
  2. 我需要加入一个从Hive读取的基本文件(Dataframe)和11-13其他参考文件来创建一个大的内存结构(400列)(大小约1 TB)

实现这一目标的最佳方法是什么?如果有人遇到类似的问题,请分享您的经验.

apache-spark apache-spark-sql

10
推荐指数
3
解决办法
1万
查看次数

PySpark加入混洗共同分区的RDD

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)], numSlices=8)
rdd2 = rdd1.mapValues(lambda x: x)
Run Code Online (Sandbox Code Playgroud)

这些RDD具有相同的分区:

rdd1.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]

rdd2.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]
Run Code Online (Sandbox Code Playgroud)

这里有多个答案,表明加入共分区数据不会导致混乱,这对我来说很有意义.示例:共同分区的RDD的连接是否会导致Apache Spark中的混乱?

但是,当我使用PySpark加入这些共同分区的RDD时,数据被混洗到一个新的分区:

rdd1.join(rdd2).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], [], [], [], [], [], [], [], [], []]
Run Code Online (Sandbox Code Playgroud)

即使我将新分区的数量设置为原始分区8,分区也会更改:

rdd1.join(rdd2, numPartitions=8).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], []]
Run Code Online (Sandbox Code Playgroud)

为什么我不能避免使用这些共同分区的RDD进行洗牌?

我正在使用Spark …

partitioning join apache-spark pyspark

4
推荐指数
1
解决办法
763
查看次数

高效的pyspark加入

我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:

  • 如果可以,请使用广播加入。(我通常不能,因为数据帧太大)
  • 考虑使用非常大的群集。(我宁愿不是因为$$$)。
  • 使用相同的分区程序

最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)

但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。

因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?

PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。

apache-spark pyspark

3
推荐指数
1
解决办法
1419
查看次数

如何在加入 Spark 之前正确应用 HashPartitioner?

为了减少加入两个 RDD 期间的混洗,我决定首先使用 HashPartitioner 对它们进行分区。这是我如何做到的。我做得对吗,还是有更好的方法来做到这一点?

val rddA = ...
val rddB = ...

val numOfPartitions = rddA.getNumPartitions

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))

val rddAB = rddApartitioned.join(rddBpartitioned)
Run Code Online (Sandbox Code Playgroud)

scala partitioner apache-spark rdd

1
推荐指数
1
解决办法
960
查看次数