flp*_*lpn 8 python-3.x apache-spark pyspark
假设我有以下管道:
df.orderBy('foo').limit(10).show()
Run Code Online (Sandbox Code Playgroud)
在这里我们可以看到orderBy指令在前,因此limit在执行指令之前应该对数据帧的所有行进行排序。我发现自己在想如果火花做一些管道内“重组”,以改善服务表现(例如,在执行limit指令之前的orderBy)。火花能做到吗?
你的假设是正确的。Spark 执行sort,然后limit在合并/收集结果之前在每个分区上执行,我们将在接下来看到。
一个orderBy接着limit会引起下一次调用:
通过查看该TakeOrderedAndProjectExec:doExecute()方法,我们将首先遇到下一个代码:
protected override def doExecute(): RDD[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val localTopK: RDD[InternalRow] = {
child.execute().map(_.copy()).mapPartitions { iter =>
org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
}
}
......
Run Code Online (Sandbox Code Playgroud)
在这里我们可以看到localTopK通过从每个排序分区中获取topK 第一条记录来填充。这意味着 Spark 尝试在分区级别尽快下推 topK 过滤器。
接下来的几行:
....
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
localTopK,
child.output,
SinglePartition,
serializer,
writeMetrics),
readMetrics)
shuffled.mapPartitions { iter =>
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
if (projectList != child.output) {
val proj = UnsafeProjection.create(projectList, child.output)
topK.map(r => proj(r))
} else {
topK
}
}
Run Code Online (Sandbox Code Playgroud)
ShuffledRowRDD将从所有分区中生成最终结果,这些分区将包含组成 的最终结果的最终 topK 排序记录limit。
例子
让我们通过一个例子来说明这一点。考虑具有1,2,3...20划分为两部分的范围的数据集。第一个包含奇数,而第二个包含偶数,如下所示:
----------- -----------
| P1 | | P2 |
----------- -----------
| 1 | | 2 |
| 3 | | 4 |
| 5 | | 6 |
| 7 | | 8 |
| 9 | | 10 |
| .... | | .... |
| 19 | | 20 |
----------- -----------
Run Code Online (Sandbox Code Playgroud)
当df.orderBy(...).limit(5)执行星火将获得前五名排序记录每个分区又名1-9为:第二个1日1和2-10。然后它将合并和排序它们又名序列1,2,3,4,5..10。最后,它将获得生成最终列表的前 5 条记录1,2,3,4,5。
结论
Spark 利用所有可用信息,orderBy然后limit省略处理整个数据集,但只处理前 K 行。正如@ShemTov 已经提到的那样,从第 1 次调用limit之前不需要调用orderBy它会返回无效的数据集,第 2 次调用因为 Spark 会在内部为您进行所有必要的优化。
| 归档时间: |
|
| 查看次数: |
1084 次 |
| 最近记录: |