为什么 Spark 不使用 collect_list 将过滤器下推到 groupBy 之前?

Jor*_*tao 5 apache-spark

考虑这个例子:

import pyspark
import pyspark.sql.functions as f


with pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]')) as sc:
    spark = pyspark.sql.SQLContext(sc)

    df = spark.createDataFrame([
        [2020, 1, 1, 1.0],
        [2020, 1, 2, 2.0],
        [2020, 1, 3, 3.0],
    ], schema=['year', 'id', 't', 'value'])

    df = df.groupBy(['year', 'id']).agg(f.collect_list('value'))
    df = df.where(f.col('year') == 2020)
    df.explain()
Run Code Online (Sandbox Code Playgroud)

这产生了以下计划

== Physical Plan ==
*(2) Filter (isnotnull(year#0L) AND (year#0L = 2020))
+- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[collect_list(value#3, 0, 0)])
   +- Exchange hashpartitioning(year#0L, id#1L, 200), true, [id=#23]
      +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[partial_collect_list(value#3, 0, 0)])
         +- *(1) Project [year#0L, id#1L, value#3]
            +- *(1) Scan ExistingRDD[year#0L,id#1L,t#2L,value#3]
Run Code Online (Sandbox Code Playgroud)

我希望 Spark 将过滤器推year = 2020送到hashpartitioning. 如果聚合函数是sum,Spark 会做,但不会为 做collect_list

关于为什么不是这种情况的任何想法,以及是否有办法解决这个问题?

这样做的原因是没有过滤器下推,声明3年(例如year IN (2020, 2019, 2018)在它们之间执行洗牌。另外,我需要在代码中在groupBy之后表达过滤器。

更重要的是,我试图理解为什么 Spark 不为某些聚合向下推过滤器,但对其他聚合却如此。

Som*_*Som 4

让我们看一下您正在使用的聚合函数。

collect_list

从下面的文档 -

/**
   * Aggregate function: returns a list of objects with duplicates.
   *
   * @note The function is non-deterministic because the order of collected results depends
   * on the order of the rows which may be non-deterministic after a shuffle.
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def collect_list(columnName: String): Column = collect_list(Column(columnName))
Run Code Online (Sandbox Code Playgroud)

collect_list是一个非确定性操作,其结果取决于行的顺序。

现在看看Optimizer.scala#PushPredicateThroughNonJoin

// SPARK-13473: We can't push the predicate down when the underlying projection output non-
    // deterministic field(s).  Non-deterministic expressions are essentially stateful. This
    // implies that, for a given input row, the output are determined by the expression's initial
    // state and all the input rows processed before. In another word, the order of input rows
    // matters for non-deterministic expressions, while pushing down predicates changes the order.
    // This also applies to Aggregate.
Run Code Online (Sandbox Code Playgroud)

由于上述操作是不确定的,即结果取决于底层数据帧的行顺序,因此 Spark 无法推送谓词,因为它改变了行的顺序。