在许多帖子中都有这样的声明 - 如下以某种形式显示 - 由于一些关于改组、分区、由于 JOIN、AGGR 等等的问题:
... 一般而言,每当您执行 spark sql 聚合或连接对数据进行混洗时,这是结果分区的数量= 200。这是由 spark.sql.shuffle.partitions 设置的。...
所以,我的问题是:
我问,因为我从来没有看到一个明确的观点。
我做了以下测试:
// genned ad DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count
sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.
ds1.rdd.partitions.size
ds2.rdd.partitions.size
val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer")
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size
Run Code Online (Sandbox Code Playgroud)
在第一个测试中 -未定义 sqlContext.setConf("spark.sql.shuffle.partitions", "765"),处理和分区数量为 200。即使 SO post 45704156 声明它可能不适用于 DF - 这是一个 DS。
在第二个测试中 -定义 sqlContext.setConf("spark.sql.shuffle.partitions", "765"),处理和分区数量为 765。即使 SO post 45704156 声明它可能不适用于 DF - 这是一个DS。
Spark.sql.shuffle.partitions 是决定分区数的参数,它在执行连接或聚合等混洗时,即跨节点的数据移动位置。另一部分 spark.default.parallelism 将根据您的数据大小和最大块大小计算,在 HDFS 中为 128mb。因此,如果您的工作不进行任何 shuffle,它将考虑默认并行度值,或者如果您使用 rdd,您可以自己设置它。当洗牌发生时,它需要 200。
Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // 这将使用 4 个分区
Val df1 = df df1.except(df).count // 将生成 200 个具有 2 个阶段的分区
小智 7
这是您的猜测的组合。
假设您有一组包含 M 个分区的输入数据,并且您将 shuffle 分区设置为 N。
在执行 join 时,spark 会读取所有 M 个分区中的输入数据,并根据 N 个分区的键重新排列数据。想象一个简单的 hashpartitioner,应用在 key 上的 hash 函数看起来很像 A = hashcode(key) % N,然后这些数据被重新分配给负责处理 Ath 分区的节点。每个节点可以负责处理多个分区。
改组后,节点将工作在它们负责的分区中聚合数据。由于这里不需要进行额外的 shuffle,节点可以直接产生输出。
所以总而言之,您的输出将合并到 N 个分区,但是合并是因为它在 N 个分区中处理,而不是因为 spark 应用了一个额外的 shuffle 阶段来专门将您的输出数据重新分区到 N。
| 归档时间: |
|
| 查看次数: |
12830 次 |
| 最近记录: |