如何在PySpark DataFrame中强制进行某个分区?

abe*_*bop 4 partitioning apache-spark pyspark

假设我有一个带有列的DataFrame partition_id:

n_partitions = 2

df = spark.sparkContext.parallelize([
    [1, 'A'],
    [1, 'B'],
    [2, 'A'],
    [2, 'C']
]).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)

我如何重新分区DataFrame以保证每个值partition_id都转到一个单独的分区,并且实际分区的数量与不同的值完全相同partition_id

如果我执行散列分区,即df.repartition(n_partitions, 'partition_id')保证分区数量正确,但某些分区可能为空,而其他分区可能包含多个partition_id由于散列冲突引起的值.

hi-*_*zir 10

Python和DataFrameAPI 没有这样的选项.分区API Dataset不可插入,仅支持预定义范围和散列分区方案.

您可以将数据转换为RDD,使用自定义分区程序进行分区,并将读取转换回DataFrame:

from pyspark.sql.functions import col, struct, spark_partition_id

mapping = {k: i for i, k in enumerate(
    df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}

result = (df
    .select("partition_id", struct([c for c in df.columns]))
    .rdd.partitionBy(len(mapping), lambda k: mapping[k])
    .values()
    .toDF(df.schema))

result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# |           1|  A|                  0|
# |           1|  B|                  0|
# |           2|  A|                  1|
# |           2|  C|                  1|
# +------------+---+-------------------+
Run Code Online (Sandbox Code Playgroud)

请记住,这只会创建特定的数据分布,并且不会设置Catalyst优化器可以使用的分区程序.

  • @philantrovert确实,但OP似乎想要1:1的关系,[不能保证哈希](/sf/ask/2199707751/). (2认同)
  • @justincress:确实,在第二个之后,“partition_id”列被包含两次——一次作为单独的列,一次作为结构列的元素。“partitionBy”将每个“Row”解释为键值映射,第一列是键,其余列是值。然后使用“.values()”删除键列(在本例中为“partition_id”),该列现在是无关的。该解决方案适用于 Spark 2.2+。 (2认同)

abe*_*bop 1

之前接受的解决方案需要从 DataFrame 转换为 RDD 并返回,但由于需要重新分区,速度相当慢。

下面给出的解决方案性能更高——额外的 Spark 操作非常快,因此总的来说,它不需要比简单的重新分区更多的计算/洗牌。

在较高层面上,我们使用迭代算法来反转 Spark 的分区哈希,然后使用此反转映射来创建新的分区键,该分区键(在分区时)给出了预期的分区分布。

import itertools

from pyspark.sql import Row
import pyspark.sql.functions as F

def construct_reverse_hash_map(spark, n_partitions, fact = 10):
    """
    Given a target number of partitions, this function constructs a
    mapping from each integer partition ID (0 through N-1) to an
    arbitrary integer, which Spark will hash to that partition ID.

    By using these new (seemingly arbitrary) integers as a column
    to repartition on, one can guarantee a 1-to-1 mapping of
    partition levels to the final partitions.

    Example return value, for n_partitions=10:

    {
        5: 80,
        9: 90,
        8: 94,
        7: 99,
        0: 92,
        1: 98,
        6: 87,
        2: 91,
        3: 85,
        4: 93
    }

    If one had a column in a dataframe with 10 unique values, 80, 90, 94,
    etc, and then partitioned on this column into 10 partitions, then
    every row with value 80 would go into partition 5, every row with
    value 90 would go into partition 9, and so on.

    :param spark: SparkSession object
    :param n_partitions: desired number of unique partitions
    :param fact: initial search space of IDs will be n_partitions*fact
    :return: dictionary mapping from sequential partition IDs to hashed
             partition IDs.
    """

    max_retries = 10
    for i in range(max_retries):
        bigger_factor = fact * 2 ** i
        hashes = (
            spark.createDataFrame([Row(orig_id=i) for i in list(range(n_partitions * bigger_factor))])
            .withColumn("h", F.hash("orig_id") % n_partitions)
            .select("orig_id", F.when(F.col("h") >= 0, F.col("h")).otherwise(F.col("h") + n_partitions).alias("new_id"))
        )
        n_unique_ids = hashes.groupBy("new_id").count().count()

        if n_unique_ids == n_partitions:
            # find a mapping between the hashed values and the original partition IDs
            return {row["new_id"]: row["orig_id"] for row in hashes.collect()}
    raise Exception("Spark reverse hash algorithm failed to converge")

def add_deterministic_1to1_partitioner(df, original_part_col, new_part_col, part_levels, seed=42):
    """
    Returns a DataFrame with a new column which can be repartitioned on to give exactly the desired partitions. We determine what
    values this column will have by inverting Spark's hash.

    :param df: original DataFrame
    :param original_part_col: logical column to be repartitioned on
    :param new_part_col: new column to be actually repartitioned on
    :param part_levels: list of unique values of part_col
    :param seed: seed value for quasirandom assignment to partitions
    :return: original DataFrame plus new column for repartitioning
    """

    part_level_map = {part_level: i for i, part_level in enumerate(part_levels)}
    part_level_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(part_level_map.items()))])

    hash_map = construct_reverse_hash_map(df.sql_ctx.sparkSession, len(part_level_map))
    hash_map_expr = F.create_map(*[F.lit(x) for x in itertools.chain(*list(hash_map.items()))])

    return (
        # convert partition level to sequential numeric partition ID
        df.withColumn("__part_id__", part_level_map_expr[F.col(original_part_col)].astype("bigint"))
        # add col which will result in 1-to-1 partitioning when repartitioend on
        .withColumn(new_part_col, hash_map_expr[F.col("__part_id__")].astype("bigint"))
        .drop("__part_id__")
    )
Run Code Online (Sandbox Code Playgroud)

演示功能:

# construct example DataFrame
data = [
    [1, 'A0'],
    [1, 'A1'],
    [2, 'B0'],
    [2, 'B1'],
    [3, 'C0'],
    [3, 'C1'],
]
partition_levels = list(set([pid for pid, _ in data]))
n_partitions = len(partition_levels)

df = spark.sparkContext.parallelize(data).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)

对所需分区列进行简单的重新分区会导致冲突 - 请注意,分区 ID 为 1 和 2 的行都被插入到分区 2 中:

df_naive_repartition = df.repartition(n_partitions, "partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_naive_repartition.orderBy("partition_id", "val").show()

#+------------+---+-------------------+
#|partition_id|val|actual_partition_id|
#+------------+---+-------------------+
#|           1| A0|                  2|
#|           1| A1|                  2|
#|           2| B0|                  2|
#|           2| B1|                  2|
#|           3| C0|                  0|
#|           3| C1|                  0|
#+------------+---+-------------------+
Run Code Online (Sandbox Code Playgroud)

而添加确定性分区键然后通过它重新分区会导致每个组被分配到一个分区:

df = add_deterministic_1to1_partitioner(df, "partition_id", "deterministic_partition_id", partition_levels)
df_1to1_repartition = df.repartition(n_partitions, "deterministic_partition_id").withColumn("actual_partition_id", F.spark_partition_id())
df_1to1_repartition.orderBy("partition_id", "val").show()

#+------------+---+--------------------------+-------------------+
#|partition_id|val|deterministic_partition_id|actual_partition_id|
#+------------+---+--------------------------+-------------------+
#|           1| A0|                        28|                  0|
#|           1| A1|                        28|                  0|
#|           2| B0|                        29|                  1|
#|           2| B1|                        29|                  1|
#|           3| C0|                        27|                  2|
#|           3| C1|                        27|                  2|
#+------------+---+--------------------------+-------------------+
Run Code Online (Sandbox Code Playgroud)

deterministic_partition_id重新分区后可以删除该列——我在这里展示它只是为了让大家更清楚地了解哈希映射函数的工作原理。)