Apache Spark - shuffle 写入的数据多于输入数据的大小

Pan*_*nos 4 shuffle apache-spark

我在本地模式下使用 Spark 2.1,我正在运行这个简单的应用程序。

val N = 10 << 20

sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")

df1.join(df2, col("k1") === col("k2")).count()
Run Code Online (Sandbox Code Playgroud)

在这里,范围(N)创建了一个Long数据集(具有唯一值),所以我假设的大小

  • df1 = N * 8 字节 ~ 80MB
  • df2 = N / 5 * 8 字节 ~ 16MB

好的,现在让我们以 df1 为例。 df1 由 8 个分区5 个 shuffledRDDs 组成,所以我假设

  • 映射器数量 (M) = 8
  • 减速器数量 (R) = 5

由于分区数较低,Spark 将使用 Hash Shuffle 将在磁盘中创建M * R 个文件,但我不明白是否每个文件都有所有数据,因此each_file_size = data_size导致M * R * data_size 个文件或all_files = data_size

但是,在执行此应用程序时,df1 = 160MB 的随机写入与上述任何一种情况都不匹配。

星火用户界面

我在这里缺少什么?为什么shuffle写入数据的大小翻了一番?

yjs*_*hen 5

首先,让我们看看是什么data size total(min, med, max)意思:

根据SQLMetrics.scala#L88ShuffleExchange.scala#L43data size total(min, med, max)我们看到的是dataSizeshuffle metric的最终值。那么,它是如何更新的呢?每次序列化记录时都会更新它:UnsafeRowSerializer.scala#L66 by dataSize.add(row.getSizeInBytes)(UnsafeRow是 Spark SQL 中记录的内部表示)。

在内部,UnsafeRow由 a 支持byte[],并在序列化期间直接复制到底层输出流,其getSizeInBytes()方法仅返回byte[]. 因此,最初的问题转换为:为什么字节表示是long记录唯一列的两倍大?这个UnsafeRow.scala文档给了我们答案:

每个元组由三部分组成:[空位集] [值] [可变长度部分]

位集用于空值跟踪并与 8 字节字边界对齐。每个字段存储一位。

因为它是 8 字节字对齐的,所以唯一的 1 个空位占用了另一个 8 字节,与长列的宽度相同。因此,每个UnsafeRow使用 16 个字节表示您的一长列行。