pyspark 是否会更改优化指令的顺序?

flp*_*lpn 8 python-3.x apache-spark pyspark

假设我有以下管道:

df.orderBy('foo').limit(10).show()
Run Code Online (Sandbox Code Playgroud)

在这里我们可以看到orderBy指令在前,因此limit在执行指令之前应该对数据帧的所有行进行排序。我发现自己在想如果火花做一些管道内“重组”,以改善服务表现(例如,在执行limit指令之前orderBy)。火花能做到吗?

abi*_*sis 6

你的假设是正确的。Spark 执行sort,然后limit在合并/收集结果之前在每个分区上执行,我们将在接下来看到。

一个orderBy接着limit会引起下一次调用:

通过查看该TakeOrderedAndProjectExec:doExecute()方法,我们将首先遇到下一个代码:

protected override def doExecute(): RDD[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val localTopK: RDD[InternalRow] = {
      child.execute().map(_.copy()).mapPartitions { iter =>
        org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
      }
    }

......

Run Code Online (Sandbox Code Playgroud)

在这里我们可以看到localTopK通过从每个排序分区中获取topK 第一条记录来填充。这意味着 Spark 尝试在分区级别尽快下推 topK 过滤器。

接下来的几行:

....

val shuffled = new ShuffledRowRDD(
      ShuffleExchangeExec.prepareShuffleDependency(
        localTopK,
        child.output,
        SinglePartition,
        serializer,
        writeMetrics),
      readMetrics)
    shuffled.mapPartitions { iter =>
      val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
      if (projectList != child.output) {
        val proj = UnsafeProjection.create(projectList, child.output)
        topK.map(r => proj(r))
      } else {
        topK
      }
    }
Run Code Online (Sandbox Code Playgroud)

ShuffledRowRDD将从所有分区中生成最终结果,这些分区将包含组成 的最终结果的最终 topK 排序记录limit

例子

让我们通过一个例子来说明这一点。考虑具有1,2,3...20划分为两部分的范围的数据集。第一个包含奇数,而第二个包含偶数,如下所示:

-----------   -----------
|   P1    |   |   P2    | 
-----------   -----------
|   1     |   |   2     |
|   3     |   |   4     |
|   5     |   |   6     |
|   7     |   |   8     |
|   9     |   |   10    |
|  ....   |   |  ....   |
|   19    |   |   20    |
-----------   -----------
Run Code Online (Sandbox Code Playgroud)

df.orderBy(...).limit(5)执行星火将获得前五名排序记录每个分区又名1-9为:第二个1日1和2-10。然后它将合并和排序它们又名序列1,2,3,4,5..10。最后,它将获得生成最终列表的前 5 条记录1,2,3,4,5

结论

Spark 利用所有可用信息,orderBy然后limit省略处理整个数据集,但只处理前 K 行。正如@ShemTov 已经提到的那样,从第 1 次调用limit之前不需要调用orderBy它会返回无效的数据集,第 2 次调用因为 Spark 会在内部为您进行所有必要的优化。