在pyspark中,为什么`limit`后跟`repartition`会创建完全相等的分区大小?

Isa*_*aac 5 python apache-spark pyspark

根据pyspark文档repartition应该使用哈希分区,这将导致分区大小略有不等。但是,我发现通过在之前加上limit,将产生完全相等的分区大小。这可以通过在pyspark shell中运行以下命令来显示:

df = spark.createDataFrame([range(5)] * 100)

def count_part_size(part_iter):
    yield len(list(part_iter))

print(df.repartition(20).rdd.mapPartitions(count_part_size).collect())
# [4, 4, 4, 5, 4, 4, 5, 4, 5, 6, 6, 6, 7, 5, 5, 5, 5, 6, 5, 5]

print(df.limit(100).repartition(20).rdd.mapPartitions(count_part_size).collect())
# [5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
Run Code Online (Sandbox Code Playgroud)

如果repartition使用哈希分区程序,为什么在这种情况下会产生完全相等的分区大小?如果未使用哈希分区程序,则使用哪种分区程序?

顺便说一句,我正在使用python版本2.7.15和spark版本2.0.2

104*_*ica 4

这里有四个因素:

\n\n
    \n
  • 如果未提供分区表达式,repartition则不使用HashPartitioning,或者具体来说,不直接使用它。相反,它使用RoundRobinPartitioning, (正如你可能猜到的那样)

    \n\n
    \n

    从随机分区开始,将元素均匀分布在输出分区上。

    \n
    \n\n

    在内部,它从随机点开始scala.Int在每个分区上生成一个序列。仅传递这些值。HashPartitioner

  • \n
  • 它以这种方式工作,因为Int hashCode这只是身份 - 换句话说

    \n\n

    \xe2\x88\x80x\xe2\x88\x88Int x = hashCode(x)

    \n\n

    (顺便说一句,这与hashScalaInt范围内的 CPython 的行为相同 -2147483648 到 2147483647。这些哈希值根本就不是为了加密安全而设计的)因此,应用于HashPartitioner一系列Int值会导致实际的循环分配。

    \n\n

    因此在这种情况下,HashPartitioner只需作为模运算符即可。

  • \n
  • LIMIT在重新分区之前应用,以便所有值首先被洗牌到单个节点。因此,仅Int使用一个值序列。

  • \n
  • 分区数是数据集大小的除数。由于数据可以均匀分布在分区之间。

  • \n
\n\n

总体而言,它是预期行为(每个分区应均匀分布在输出分区之间)、管道属性(只有一个输入分区)和数据(数据集可以均匀分布)的组合。

\n