Pan*_*nos 4 shuffle apache-spark
我在本地模式下使用 Spark 2.1,我正在运行这个简单的应用程序。
val N = 10 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
Run Code Online (Sandbox Code Playgroud)
在这里,范围(N)创建了一个Long数据集(具有唯一值),所以我假设的大小
- df1 = N * 8 字节 ~ 80MB
- df2 = N / 5 * 8 字节 ~ 16MB
好的,现在让我们以 df1 为例。 df1 由 8 个分区和5 个 shuffledRDDs 组成,所以我假设
- 映射器数量 (M) = 8
- 减速器数量 (R) = 5
由于分区数较低,Spark 将使用 Hash Shuffle 将在磁盘中创建M * R 个文件,但我不明白是否每个文件都有所有数据,因此each_file_size = data_size导致M * R * data_size 个文件或all_files = data_size。
但是,在执行此应用程序时,df1 = 160MB 的随机写入与上述任何一种情况都不匹配。
我在这里缺少什么?为什么shuffle写入数据的大小翻了一番?
首先,让我们看看是什么data size total(min, med, max)意思:
根据SQLMetrics.scala#L88和ShuffleExchange.scala#L43,data size total(min, med, max)我们看到的是dataSizeshuffle metric的最终值。那么,它是如何更新的呢?每次序列化记录时都会更新它:UnsafeRowSerializer.scala#L66 by dataSize.add(row.getSizeInBytes)(UnsafeRow是 Spark SQL 中记录的内部表示)。
在内部,UnsafeRow由 a 支持byte[],并在序列化期间直接复制到底层输出流,其getSizeInBytes()方法仅返回byte[]. 因此,最初的问题转换为:为什么字节表示是long记录唯一列的两倍大?这个UnsafeRow.scala文档给了我们答案:
每个元组由三部分组成:[空位集] [值] [可变长度部分]
位集用于空值跟踪并与 8 字节字边界对齐。每个字段存储一位。
因为它是 8 字节字对齐的,所以唯一的 1 个空位占用了另一个 8 字节,与长列的宽度相同。因此,每个UnsafeRow使用 16 个字节表示您的一长列行。
| 归档时间: |
|
| 查看次数: |
1585 次 |
| 最近记录: |