Spark:重新分区与partitionBy中的列参数顺序

y2k*_*ham 6 partitioning dataframe apache-spark apache-spark-sql

考虑的方法(Spark 2.2.1):

  1. DataFrame.repartition(带partitionExprs: Column*参数的两个实现)
  2. DataFrameWriter.partitionBy

注意:这个问题不会问这些方法之间的区别

文档partitionBy:

如果指定,输出奠定了类似文件系统Hive分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:

  • 年= 2016 /月= 01 /
  • 年= 2016 /月= 02 /

由此,我推断列参数顺序将决定目录布局; 因此它是相关的.

文档repartition:

返回Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset散列分区.

根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.


所有事情都说,我的疑虑如下:

  • partitionBy方法一样,输入的顺序也与方法相关repartition吗?
  • 如果上述问题的答案是
    • :是否每个的并行执行中提取包含相同的数据,本来每个有我们运行一个SQL与查询GROUP BY在相同的列?
    • 是的:请解释repartition(columnExprs: Column*)方法的行为
  • 在第三个实现中使用numPartitions: IntpartitionExprs: Column*论证的相关性是什么repartition

use*_*411 7

这两种方法之间唯一的相似之处是它们的名称.有不同的东西,有不同的机制,所以你不应该比较它们.

话虽如此,repartition使用以下方式对数据进行洗牌:

  • 使用partitionExprs它在表达式中使用的列上使用散列分区器spark.sql.shuffle.partitions.
  • 使用partitionExprsnumPartitions它一样,与前一个相同,但重写spark.sql.shuffle.partitions.
  • 使用numPartitions它只需重新排列数据RoundRobinPartitioning.

在重新分区方法中列输入的顺序也是如此?

它是.hash((x, y))通常不一样hash((y, x)).

df = (spark.range(5, numPartitions=4).toDF("x")
    .selectExpr("cast(x as string)")
    .crossJoin(spark.range(5, numPartitions=4).toDF("y")))

df.repartition(4, "y", "x").rdd.glom().map(len).collect()
Run Code Online (Sandbox Code Playgroud)
[8, 6, 9, 2]
Run Code Online (Sandbox Code Playgroud)
df.repartition(4, "x", "y").rdd.glom().map(len).collect()
Run Code Online (Sandbox Code Playgroud)
[6, 4, 3, 12]
Run Code Online (Sandbox Code Playgroud)

如果我们在相同的列上运行带有GROUP BY的SQL查询,那么为并行执行提取的每个块是否包含与每个组中相同的数据?

取决于具体问题.

相关如何定义DataFrame的分区?