spark.sql.shuffle.partitions 的 200 个默认分区难题

the*_*tom 9 apache-spark

在许多帖子中都有这样的声明 - 如下以某种形式显示 - 由于一些关于改组、分区、由于 JOIN、AGGR 等等的问题:

... 一般而言,每当您执行 spark sql 聚合或连接对数据进行混洗时,这是结果分区的数量= 200。这是由 spark.sql.shuffle.partitions 设置的。...

所以,我的问题是:

  • 我们的意思是,如果我们将 DF 的分区设置为 765,例如,
    • 处理是针对 765 个分区进行的,但是输出被合并/重新分区标准为 200 - 这里指的是 word结果
    • 或者它是否在合并/重新分区到 200 个分区之后使用 200 个分区进行处理,然后再加入、AGGR?

我问,因为我从来没有看到一个明确的观点。

我做了以下测试:

// genned ad DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count

sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.

ds1.rdd.partitions.size
ds2.rdd.partitions.size

val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size
Run Code Online (Sandbox Code Playgroud)

在第一个测试中 -未定义 sqlContext.setConf("spark.sql.shuffle.partitions", "765"),处理和分区数量为 200。即使 SO post 45704156 声明它可能不适用于 DF - 这是一个 DS。

在第二个测试中 -定义 sqlContext.setConf("spark.sql.shuffle.partitions", "765"),处理和分区数量为 765。即使 SO post 45704156 声明它可能不适用于 DF - 这是一个DS。

Cha*_*Ray 7

Spark.sql.shuffle.partitions 是决定分区数的参数,它在执行连接或聚合等混洗时,即跨节点的数据移动位置。另一部分 spark.default.parallelism 将根据您的数据大小和最大块大小计算,在 HDFS 中为 128mb。因此,如果您的工作不进行任何 shuffle,它将考虑默认并行度值,或者如果您使用 rdd,您可以自己设置它。当洗牌发生时,它需要 200。

Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // 这将使用 4 个分区

Val df1 = df df1.except(df).count // 将生成 200 个具有 2 个阶段的分区


小智 7

这是您的猜测的组合。

假设您有一组包含 M 个分区的输入数据,并且您将 shuffle 分区设置为 N。

在执行 join 时,spark 会读取所有 M 个分区中的输入数据,并根据 N 个分区的键重新排列数据。想象一个简单的 hashpartitioner,应用在 key 上的 hash 函数看起来很像 A = hashcode(key) % N,然后这些数据被重新分配给负责处理 Ath 分区的节点。每个节点可以负责处理多个分区。

改组后,节点将工作在它们负责的分区中聚合数据。由于这里不需要进行额外的 shuffle,节点可以直接产生输出。

所以总而言之,您的输出将合并到 N 个分区,但是合并是因为它在 N 个分区中处理,而不是因为 spark 应用了一个额外的 shuffle 阶段来专门将您的输出数据重新分区到 N。