在spark中将dataframe转换为rdd的成本

Pri*_*hak 3 apache-spark rdd apache-spark-sql

我正在尝试使用以下方法获取数据帧的分区数:

df.rdd.getNumPartitions.toString
Run Code Online (Sandbox Code Playgroud)

但是当我监控 Spark 日志时,我发现它会启动许多阶段,并且是一项成本高昂的操作。 在此输入图像描述

根据我的理解,dataframe通过元数据为rdd添加了一个结构层。那么,为什么在转换为 rdd 时剥离它需要这么多时间呢?

SCo*_*uto 7

DataFrame 是一个优化的分布式表格集合。由于它保留表格格式(类似于 SQL 表),因此它可以保留元数据以允许 Spark 在幕后执行一些优化。

这种优化是由CatalystTungsten等副项目执行的

RDD 不维护任何 schema,如果需要,您需要提供一个 schema。所以RDD没有Dataframe那么高度优化,(Catalyst根本不参与)

将 DataFrame 转换为 RDD 会强制 Spark 循环所有元素,将它们从高度优化的 Catalyst 空间转换为 scala 空间。

检查代码来自.rdd

  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

@transient private lazy val rddQueryExecution: QueryExecution = {
    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
    sparkSession.sessionState.executePlan(deserialized)
  }
Run Code Online (Sandbox Code Playgroud)

因此,首先,它执行计划并检索输出RDD[InternalRow],顾名思义,该输出仅供内部使用,需要转换为RDD[Row]

然后它循环遍历转换它们的所有行。正如您所看到的,这不仅仅是删除架构

希望能回答您的问题。