如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数

jav*_*dba 4 scala partition apache-spark

关于如何获得RDDa和/或a 的分区数量有很多问题DataFrame:答案总是如下:

 rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

要么

 df.rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

不幸的是,这是一个昂贵的操作,DataFrame因为

 df.rdd
Run Code Online (Sandbox Code Playgroud)

需要转换DataFramerdd.这是运行所需时间的顺序

 df.count
Run Code Online (Sandbox Code Playgroud)

我正在编写逻辑,可选择 repartition "s"或coalesce"sa" DataFrame- 基于当前分区数是否在可接受的值范围内,或者低于或高于它们.

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }
Run Code Online (Sandbox Code Playgroud)

但是,我们不能承担的成本rdd.getNumPartitions每一个 DataFrame以这种方式.

请问有没有什么办法可以获取这些信息-例如从查询在线/暂时catalogregistered,也许表?

更新 Spark GUI将DataFrame.rdd操作显示为与作业中最长的sql一样长.我将重新运行这项工作并在此处附上屏幕截图.

以下只是一个测试用例:它使用的是生产中数据大小的一小部分.最长的sql是只有五分钟-而这一次的道路上花费的时间量,以及(注意,sql不是帮助了这里:它也有执行后续从而有效地加倍累积执行时间).

在此输入图像描述

我们可以看到,.rdd在操作DataFrameUtils线30(在上面的片段所示)采用5.1mins -但在save操作仍然采取5.2分钟后-即我们并没有做保存的任何时间.rdd在随后的执行时间save.

use*_*362 10

rdd组件没有固有成本rdd.getNumPartitions,因为RDD从不评估返回.

虽然您可以根据经验轻松确定这一点,但使用调试器(我会将其作为读者的练习),或者确定在基本情况下不会触发任何作业

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
Run Code Online (Sandbox Code Playgroud)
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
Run Code Online (Sandbox Code Playgroud)

说服你可能还不够.让我们以更系统的方式解决这个问题:

  • rdd返回a MapPartitionRDD(ds如上定义):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
    Run Code Online (Sandbox Code Playgroud)
  • RDD.getNumPartitions 调用RDD.partitions.

  • 在非检查点场景中RDD.partitions调用getPartitions(随意跟踪检查点路径).
  • RDD.getPartitions 是抽象的.
  • 因此,在这种情况下使用的实际实现是MapPartitionsRDD.getPartitions,它只是将调用委托给父代.
  • 只有源和源MapPartitionsRDD之间rdd.

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    
    Run Code Online (Sandbox Code Playgroud)

    同样,如果Dataset包含交换,我们将跟随父母到最近的shuffle:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    
    Run Code Online (Sandbox Code Playgroud)

    请注意,这种情况特别有趣,因为我们实际上触发了一项工作:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    
    Run Code Online (Sandbox Code Playgroud)

    这是因为我们遇到的,其中分区不能静态确定方案(请参阅数据帧分区数排序后?为什么sortBy转型引发火花的工作吗?).

    在这种情况下getNumPartitions也会触发一项工作:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    
    Run Code Online (Sandbox Code Playgroud)

    但这并不意味着观察到的成本与.rdd呼叫有某种关系.相反partitions,如果没有静态公式(例如,某些Hadoop输入格式,需要对数据进行全面扫描),则查找的固有成本.

请注意,此处提出的要点不应推断为其他应用程序Dataset.rdd.例如ds.rdd.count,确实昂贵且浪费.