使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区

Pow*_*ers 6 partitioning apache-spark apache-spark-sql

我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy算法在我尝试过的两种方法中都遇到了麻烦。

分区严重偏斜-有些分区很大,有些很小。

问题1

当我之前使用repartition时repartitionBy,Spark将所有分区写为单个文件,即使是大文件也是如此

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。

问题2

当我不使用时repartition,Spark会写出太多文件。

此代码将写出疯狂的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!

当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。

我想要什么

我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。

我最好的前进道路是什么?

小智 9

Nick Chammas 方法的另一种方法是创建一个按主分区键分区的 row_number() 列,然后将其除以您希望在每个分区中出现的确切记录数。用 SPARK SQL 表示如下:

SELECT /*+ REPARTITION(id, file_num) */
  id,
  FLOOR(ROW_NUMBER() OVER(PARTITION BY id ORDER BY NULL) / rows_per_file) AS file_num
FROM skewed_data

Run Code Online (Sandbox Code Playgroud)

这样做的另一个好处是,它允许您通过使用辅助键子句将大部分数据跨文件放置在一个分区中。ORDER BY如果与辅助键关联的行号跨越两个值,则不保证辅助键位于同一file_num位置。也有可能(事实上有点可能)最终得到一个文件,每个分区中的记录很少。

  • 不过,最大的好处不是托管,因为您无法保证边界出现在哪里。最大的好处是,这种方法比 Nick Chammas 的方法少使用一个阶段,因此性能更高。缺少连接会增加性能提升,这意味着相等性检查会减少。 (2认同)

Nic*_*mas 7

我希望每个分区都被写成 1 GB 的文件。因此,具有 7 GB 数据的分区将作为 7 个文件写出,而具有 0.3 GB 数据的分区将作为单个文件写出。

当前接受的答案在大多数情况下可能已经足够好,但并没有完全满足将 0.3 GB 分区写入单个文件的要求。相反,它将numPartitions为每个输出分区目录写出文件,包括 0.3 GB 分区。

您正在寻找的是一种根据数据分区的大小动态扩展输出文件数量的方法。为此,我们将基于 10465355 的方法rand()来控制 的行为repartition(),并rand()根据我们想要用于该分区的文件数量来扩展 的范围。

很难通过输出文件大小来控制分区行为,因此我们将使用每个输出文件所需的大致行数来控制它。

我将在 Python 中提供一个演示,但该方法在 Scala 中基本相同。

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

spark = SparkSession.builder.getOrCreate()
skewed_data = (
    spark.createDataFrame(
        [(1,)] * 100 + [(2,)] * 10 + [(3,), (4,), (5,)],
        schema=['id'],
    )
)
partition_by_columns = ['id']
desired_rows_per_output_file = 10

partition_count = skewed_data.groupBy(partition_by_columns).count()

partition_balanced_data = (
    skewed_data
    .join(partition_count, on=partition_by_columns)
    .withColumn(
        'repartition_seed',
        (
            rand() * partition_count['count'] / desired_rows_per_output_file
        ).cast('int')
    )
    .repartition(*partition_by_columns, 'repartition_seed')
)
Run Code Online (Sandbox Code Playgroud)

无论分区大小如何倾斜,这种方法都会平衡输出文件的大小。每个数据分区都将获得它需要的文件数,以便每个输出文件大致具有请求的行数。

这种方法的先决条件是计算每个分区的大小,您可以在partition_count. 如果您真的想动态扩展每个分区的输出文件数量,这是不可避免的。

为了证明这是正确的,让我们检查分区内容:

from pyspark.sql.functions import spark_partition_id

(
    skewed_data
    .groupBy('id')
    .count()
    .orderBy('id')
    .show()
)

(
    partition_balanced_data
    .select(
        *partition_by_columns,
        spark_partition_id().alias('partition_id'),
    )
    .groupBy(*partition_by_columns, 'partition_id')
    .count()
    .orderBy(*partition_by_columns, 'partition_id')
    .show(30)
)
Run Code Online (Sandbox Code Playgroud)

输出如下所示:

+---+-----+
| id|count|
+---+-----+
|  1|  100|
|  2|   10|
|  3|    1|
|  4|    1|
|  5|    1|
+---+-----+

+---+------------+-----+
| id|partition_id|count|
+---+------------+-----+
|  1|           7|    9|
|  1|          49|    6|
|  1|          53|   14|
|  1|         117|   12|
|  1|         126|   10|
|  1|         136|   11|
|  1|         147|   15|
|  1|         161|    7|
|  1|         177|    7|
|  1|         181|    9|
|  2|          85|   10|
|  3|          76|    1|
|  4|         197|    1|
|  5|          10|    1|
+---+------------+-----+
Run Code Online (Sandbox Code Playgroud)

根据需要,每个输出文件大约有 10 行。id=1得到10个分区,id=2得到1个分区,id={3,4,5}每个得到1个分区。

该解决方案平衡了输出文件的大小,而不考虑数据倾斜,并且不依赖于maxRecordsPerFile.


104*_*ica 6

最简单的解决方案是向其中添加一个或多个列,repartition并显式设置分区数。

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

哪里:

  • numPartitions -应该是写入分区目录的所需文件数的上限(实际数字可以低一些)。
  • $"some_other_col"(和可选的附加列)应具有较高的基数,并且应独立于$"some_column(这两者之间应具有功能依赖性,并且不应高度相关)。

    如果数据不包含此类列,则可以使用o.a.s.sql.functions.rand

    import org.apache.spark.sql.functions.rand
    
    df.repartition(numPartitions, $"some_col", rand)
      .write.partitionBy("some_col")
      .parquet("partitioned_lake")
    
    Run Code Online (Sandbox Code Playgroud)

  • 我有完全相同的情况,所以我选择加盐,并且我希望每个分区的文件不超过 5 个。但令我困惑的是,我在保存阶段看到了 5 个任务,而我不太明白的是,我认为数据应该是平衡的,但也应该以最好地利用集群资源的方式进行组织 (2认同)