为什么Spark DataFrame转换为RDD需要完全重新映射?

jav*_*dba 10 scala apache-spark

来自Spark的源代码:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }
Run Code Online (Sandbox Code Playgroud)

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972

mapPartitions只要可以作为计算时间RDD摆在首位.所以这使得诸如操作

df.rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

非常贵.考虑到一个 DataFrameDataSet[Row]和一个DataSetRDD就是为什么需要重新映射?任何见解都表示赞赏.

use*_*362 8

TL; DR那是因为内部RDD不是RDD[Row].

鉴于DataFrame是DataSet[Row]和一个DataSet由RDD组成

这是一个巨大的过度简化.首先,DataSet[T] 并不意味着你与容器的互动T.这意味着如果您使用类似集合的API(通常称为强类型),内部表示将被解码T.

内部表示是由Tungsten.This表示内部使用二进制格式是内部和变化和太低的水平受试者在实践中使用.

中间表示,它公开这个数据是InternalRow- rddQueryExecution.toRDD实际上RDD[InternalRow].这种表示(有不同的实现)仍然暴露内部类型,被认为是"弱"私有,因为所有对象o.a.s.sql.catalyst(访问没有明确限制,但API没有记录),而且很难与之交互.

这就是解码发挥作用以及为什么需要完全"重新映射" - 将内部(通常是不安全的)对象转换为用于公共用途的外部类型.

最后,重申我之前的陈述 - 有问题的代码在getNumPartitions被调用时不会被执行.