为什么Spark中repartition比partitionBy更快?

Rob*_*gas 9 apache-spark apache-spark-sql pyspark apache-spark-xml

我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件(90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件中(分割)。现在让\xe2\x80\x99s 假设我们的目标是 100 个分区,并且给定设备数据显示在同一个输出文件(只是同一个分区)中并不重要。

\n

考虑到这个问题,我们\xe2\x80\x99想出了两种方法来做到这一点 - repartitionthenwritewritewithpartitionBy应用于Writer. 其中任何一个的代码都非常简单:

\n

repartition(添加哈希列是为了确保与partitionBy下面的代码的比较是一对一的):

\n
\ndf = spark.read.format("xml") \\\n  .options(rowTag="DeviceData") \\\n  .load(file_path, schema=meter_data) \\\n  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \\\n  .repartition("partition") \\\n  .write.format("json") \\\n  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \\\n  .mode("overwrite") \\\n  .save(output_path)\n\n
Run Code Online (Sandbox Code Playgroud)\n

partitionBy:

\n
\ndf = spark.read.format("xml") \\\n  .options(rowTag="DeviceData") \\\n  .load(file_path, schema=meter_data) \\\n  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \\\n  .write.format("json") \\\n  .partitionBy(\xe2\x80\x9cpartition\xe2\x80\x9d) \\\n  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \\\n  .mode("overwrite") \\\n  .save(output_path)\n\n
Run Code Online (Sandbox Code Playgroud)\n

在我们的测试中,repartition速度比partitionBy. 为什么是这样?

\n

根据我的理解,repartition这会导致混乱,我的 Spark 知识告诉我要尽可能避免这种情况。另一方面,partitionBy(根据我的理解)仅引起每个节点本地的排序操作 - 不需要洗牌。我是否误解了一些让我认为partitionBy会更快的事情?

\n

Rin*_*haj 13

我认为@Oli 在他对主要答案的评论中完美地解释了这个问题。我只想补充我的 2 美分并尝试解释同样的事情。

假设当您读取 XML 文件 [90K 文件] 时,spark 将其读入N 个分区。这是根据spark.sql.files.maxPartitionBytes文件格式压缩类型等因素的数量决定的。

我们假设它是10K分区。这发生在代码的下面部分。

df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
Run Code Online (Sandbox Code Playgroud)

假设您使用num_partitions = 100 ,您将添加一个名为“分区”的新列,其值为 0-99。Spark 只是向现有的数据帧 [或 rdd] 添加一个新列,该数据帧被分成 10K 分区。

.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
Run Code Online (Sandbox Code Playgroud)

到目前为止,两个代码是相同的。

现在,让我们比较一下重新分区和分区所发生的情况

情况一:重新分区

.repartition("partition") \
.write.format("json") \
Run Code Online (Sandbox Code Playgroud)

在这里,您将根据具有 100 个不同值的“分区”列对现有数据框进行重新分区。因此,现有的数据帧将进行完全洗牌,将分区数量从10K 减少到 100。由于涉及完整的洗牌,因此该阶段的计算量很大。如果一个特定分区的大小确实很大[倾斜分区],这也可能会失败。

但这里的优点是,在写入发生的下一阶段,Spark 只需要向output_path写入100 个文件。每个文件仅具有与“分区”列的一个值相对应的数据

情况2:partitionBy

.write.format("json") \
.partitionBy("partition") \
Run Code Online (Sandbox Code Playgroud)

在这里,您要求 Spark 将现有数据帧写入由列"partition"的不同值分区的output_path中。您无处可要求 Spark 减少数据帧的现有分区数量。

因此spark会在output_path中创建新的文件夹 ,并写入与其中每个分区对应的数据。

output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"
Run Code Online (Sandbox Code Playgroud)

现在,由于现有数据帧上有 10K Spark 分区,并假设最坏的情况,其中每个 10K 分区都具有“partition”列的所有不同值,Spark 将必须写入10K * 100 = 1M文件。即,所有 10K 分区的某些部分将写入“分区”列创建的所有 100 个文件夹中。这样spark将通过在output_path中创建子目录将1M文件写入到output_path中。优点是我们使用这种方法跳过了完全洗牌。

现在,与案例 1中的内存计算密集型 shuffle 相比,这会慢得多,因为 Spark 必须创建 1M 个文件并将它们写入持久存储。同样,最初到临时文件夹,然后到output_path

如果写入发生在 AWS S3 或 GCP Blob 等对象存储中,速度会慢得多

情况3:合并+partitionBy

.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \
Run Code Online (Sandbox Code Playgroud)

在这种情况下,您将使用coalesce()将 Spark 分区的数量从 10K 减少到 100 ,并将其写入按列"partition" 分区的output_path

因此,假设最坏的情况,即这 100 个分区中的每一个都具有“partition”列的所有不同值,spark 将必须写入100 * 100 = 10K文件。

这仍然比情况 2快,但比情况 1慢。这是因为您正在使用coalesce()进行部分洗牌,但最终仍将10K 文件写入output_path

情况4:重新分区+partitionBy

.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \
Run Code Online (Sandbox Code Playgroud)

在这种情况下,您将使用repartition()将 Spark 分区的数量从 10K 减少到 100 [列"partition"的不同值] ,并将其写入按列"partition"分区的output_path

因此,这 100 个分区中的每一个都只有“分区”列的一个不同值,spark 将必须写入100 * 1 = 100 个文件。由partitionBy()创建的每个子文件夹内仅包含1 个文件。

这将花费与案例 1相同的时间,因为这两种情况都涉及完全洗牌,然后写入100 个文件。这里唯一的区别是 100 个文件将位于output_path下的子文件夹中。

此设置对于在通过 Spark 或 Hive 读取 output_path 时过滤器的谓词下推非常有用。

结论:

尽管partitionBy比repartition更快,但根据数据帧分区的数量和这些分区内数据的分布,仅使用partitionBy可能最终会代价高昂。


Oli*_*Oli 12

TLDR:当您调用 时,Spark 会触发排序partitionBy,而不是哈希重新分区。这就是为什么你的情况要慢得多。

我们可以用一个玩具示例来检查这一点:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")
Run Code Online (Sandbox Code Playgroud)

有向无环图1

不要注意灰色阶段,它被跳过,因为它是在之前的作业中计算的。

然后,与partitionBy

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")
Run Code Online (Sandbox Code Playgroud)

有向无环图2

您可以检查是否可以添加repartitionbefore partitionBy,排序仍然存在。那么发生了什么事?请注意,第二个 DAG 中的排序不会触发随机播放。这是一个地图分区。事实上,当您调用 时partitionBy,spark 并不会像人们一开始所期望的那样对数据进行混洗。Spark 单独对每个分区进行排序,然后每个执行器将其数据写入相应分区的单独文件中。因此,请注意,partitionBy您写的不是num_partitions文件,而是文件之间的num_partitions内容num_partitions * num_executors。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。