相关疑难解决方法(0)

为什么Spark DataFrame转换为RDD需要完全重新映射?

来自Spark的源代码:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }
Run Code Online (Sandbox Code Playgroud)

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972

mapPartitions只要可以作为计算时间RDD摆在首位.所以这使得诸如操作

df.rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

非常贵.考虑到一个 DataFrameDataSet[Row]和一个DataSetRDD就是为什么需要重新映射?任何见解都表示赞赏.

scala apache-spark

10
推荐指数
1
解决办法
337
查看次数

如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数

关于如何获得RDDa和/或a 的分区数量有很多问题DataFrame:答案总是如下:

 rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

要么

 df.rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

不幸的是,这是一个昂贵的操作,DataFrame因为

 df.rdd
Run Code Online (Sandbox Code Playgroud)

需要转换DataFramerdd.这是运行所需时间的顺序

 df.count
Run Code Online (Sandbox Code Playgroud)

我正在编写逻辑,可选择 repartition "s"或coalesce"sa" DataFrame- 基于当前分区数是否在可接受的值范围内,或者低于或高于它们.

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) …
Run Code Online (Sandbox Code Playgroud)

scala partition apache-spark

4
推荐指数
1
解决办法
2897
查看次数

标签 统计

apache-spark ×2

scala ×2

partition ×1