pal*_*upz 7 apache-spark apache-spark-sql pyspark
我在这里浏览了文档:https : //spark.apache.org/docs/latest/api/python/pyspark.sql.html
它说:
和前面的问题也提到了它。但是,我仍然不明白它们究竟有何不同,以及在选择其中一个时会产生什么影响?
更重要的是,如果 repartition 进行哈希分区,提供列作为其参数有什么影响?
mik*_*ike 11
我认为最好通过一些实验来研究差异。
对于这个实验,我使用了以下两个数据帧(我在 Scala 中展示了代码,但概念与 Python API 相同):
// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")
// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")
Run Code Online (Sandbox Code Playgroud)
repartition应用HashPartitioner何时提供一列或多列,以及RoundRobinPartitioner在提供的分区数量上均匀分布数据。如果提供了一列(或更多),这些值将被散列并用于通过计算类似的东西来确定分区号partition = hash(columns) % numberOfPartitions。
repartitionByRange将根据列值的范围对数据进行分区。这通常用于连续(非离散)值,例如任何类型的数字。请注意,由于性能原因,此方法使用采样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以由 config 控制spark.sql.execution.rangeExchange.sampleSizePerPartition。
还值得一提的是,对于这两种方法,如果没有numPartitions给出,默认情况下它会将 Dataframe 数据分区到spark.sql.shuffle.partitions您的 Spark 会话中配置,并且可以通过自适应查询执行(自 Spark 3.x 起可用)合并。
基于给定的 Testdata 我总是应用相同的代码:
val testDf = df
// here I will insert the partition logic
.withColumn("partition", spark_partition_id()) // applying SQL built-in function to determin actual partition
.groupBy(col("partition"))
.agg(
count(col("value")).as("count"),
min(col("value")).as("min_value"),
max(col("value")).as("max_value"))
.orderBy(col("partition"))
testDf.show(false)
Run Code Online (Sandbox Code Playgroud)
正如预期的那样,我们得到了 4 个分区,因为 的值df范围从 0 到 1000000,我们看到它们的散列值将产生一个分布良好的 Dataframe。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |249911|12 |1000000 |
|1 |250076|6 |999994 |
|2 |250334|2 |999999 |
|3 |249680|0 |999998 |
+---------+------+---------+---------+
Run Code Online (Sandbox Code Playgroud)
同样在这种情况下,我们得到 4 个分区,但这次最小值和最大值清楚地显示了分区内的值范围。它几乎均匀分布,每个分区有 250000 个值。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |244803|0 |244802 |
|1 |255376|244803 |500178 |
|2 |249777|500179 |749955 |
|3 |250045|749956 |1000000 |
+---------+------+---------+---------+
Run Code Online (Sandbox Code Playgroud)
现在,我们正在使用另一个 Dataframe df2。这里,散列算法对只有 0、5000、10000 或 100000 的值进行散列。当然,值 0 的散列将始终相同,因此所有零最终都在同一个分区中(在这种情况下,分区 3 )。其他两个分区只包含一个值。
+---------+-------+---------+---------+
|partition|count |min_value|max_value|
+---------+-------+---------+---------+
|0 |1 |100000 |100000 |
|1 |1 |10000 |10000 |
|2 |1 |5000 |5000 |
|3 |1000001|0 |0 |
+---------+-------+---------+---------+
Run Code Online (Sandbox Code Playgroud)
如果不使用“value”列的内容,该repartition方法将在 RoundRobin 的基础上分发消息。所有分区的数据量几乎相同。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |250002|0 |5000 |
|1 |250002|0 |10000 |
|2 |249998|0 |100000 |
|3 |250002|0 |0 |
+---------+------+---------+---------+
Run Code Online (Sandbox Code Playgroud)
这种情况表明数据帧df2没有很好地定义用于按范围重新分区,因为几乎所有值都是 0。因此,我们甚至最终只有两个分区,而分区 0 包含所有零。
+---------+-------+---------+---------+
|partition|count |min_value|max_value|
+---------+-------+---------+---------+
|0 |1000001|0 |0 |
|1 |3 |5000 |100000 |
+---------+-------+---------+---------+
Run Code Online (Sandbox Code Playgroud)
通过使用df.explain您可以获得有关这些操作的大量信息。
我使用这个 DataFrame 作为示例:
\ndf = spark.createDataFrame([(i, f"value {i}") for i in range(1, 22, 1)], ["id", "value"])\nRun Code Online (Sandbox Code Playgroud)\n根据是否指定键表达式(列),分区方法会有所不同。正如你所说,它并不总是哈希分区。
\ndf.repartition(3).explain(True)\n\n== Parsed Logical Plan ==\nRepartition 3, true\n+- LogicalRDD [id#0L, value#1], false\n\n== Analyzed Logical Plan ==\nid: bigint, value: string\nRepartition 3, true\n+- LogicalRDD [id#0L, value#1], false\n\n== Optimized Logical Plan ==\nRepartition 3, true\n+- LogicalRDD [id#0L, value#1], false\n\n== Physical Plan ==\nExchange RoundRobinPartitioning(3)\n+- Scan ExistingRDD[id#0L,value#1]\nRun Code Online (Sandbox Code Playgroud)\n我们可以在生成的使用的物理计划中看到RoundRobinPartitioning:
\n\n表示一种分区,其中行从随机目标分区号开始\n并以循环方式分布\n,从而在输出分区之间均匀分布。在实现 DataFrame.repartition() 运算符时使用此分区。
\n
当使用按列表达式重新分区时:
\ndf.repartition(3, "id").explain(True)\n\n== Parsed Logical Plan ==\n\'RepartitionByExpression [\'id], 3\n+- LogicalRDD [id#0L, value#1], false\n\n== Analyzed Logical Plan ==\nid: bigint, value: string\nRepartitionByExpression [id#0L], 3\n+- LogicalRDD [id#0L, value#1], false\n\n== Optimized Logical Plan ==\nRepartitionByExpression [id#0L], 3\n+- LogicalRDD [id#0L, value#1], false\n\n== Physical Plan ==\nExchange hashpartitioning(id#0L, 3)\n+- Scan ExistingRDD[id#0L,value#1]\nRun Code Online (Sandbox Code Playgroud)\n现在选择的分区方法是。\n在散列分区方法中,正在为每个键表达式计算hashpartitioningJava ,以通过计算模数来确定目的地:。Object.hashCodepartition_idkey.hashCode % numPartitions
numPartitions此分区方法根据分区键创建连续且不重叠的值范围。因此,至少需要一个键表达式并且需要是可排序的。
df.repartitionByRange(3, "id").explain(True)\n\n== Parsed Logical Plan ==\n\'RepartitionByExpression [\'id ASC NULLS FIRST], 3\n+- LogicalRDD [id#0L, value#1], false\n\n== Analyzed Logical Plan ==\nid: bigint, value: string\nRepartitionByExpression [id#0L ASC NULLS FIRST], 3\n+- LogicalRDD [id#0L, value#1], false\n\n== Optimized Logical Plan ==\nRepartitionByExpression [id#0L ASC NULLS FIRST], 3\n+- LogicalRDD [id#0L, value#1], false\n\n== Physical Plan ==\nExchange rangepartitioning(id#0L ASC NULLS FIRST, 3)\n+- Scan ExistingRDD[id#0L,value#1]\nRun Code Online (Sandbox Code Playgroud)\n查看生成的物理计划,我们可以看到它rangepartitioning与上述其他两个计划的不同之处在于分区表达式中存在排序子句。当表达式中没有明确指定排序顺序时,默认使用升序。
| 归档时间: |
|
| 查看次数: |
2406 次 |
| 最近记录: |