Dataset.rdd 是动作还是转换?

lin*_*hrr 4 apache-spark apache-spark-sql

评估数据帧是否为空的一种方法是执行df.rdd.isEmpty(),但是,我rdd at mycode.scala:123在 sparkUI 执行中看到。这让我想知道这个 rdd() 函数是否实际上是一个动作而不是一个转换。

我知道 isEmpty() 是一个动作,但我确实看到了一个单独的阶段 where isEmpty() at mycode.scala:234,所以我认为它们是不同的动作?

Jac*_*ski 6

rdd生成以“RDD 术语”表示结构化查询,以便 Spark 可以执行它。它是您类型的 JVM 对象的 RDD T。如果使用不当会导致内存问题,因为:

  1. 将位于 JVM 外部的内部管理的优化行传输到 JVM 中的内存空间

  2. 将二进制行转换为您的业务对象(JVM“真实”表示)

第一个将增加计算所需的 JVM 内存,而后者是一个额外的转换步骤。

对于这样一个计算行数的简单计算,您宁愿坚持count优化且相当便宜的计算(可以避免复制对象和应用模式)。

在内部,Dataset将行保留在其InternalRow. 这降低了 Spark 应用程序的 JVM 内存需求。计算 RDD (from rdd) 以表示一旦执行 Spark 操作要执行的 Spark 转换。

请注意,执行rdd会创建一个 RDD 并且也需要一些计算。

所以,是的,rdd可能被认为是一个动作,因为它“执行”了查询(即位于后面的数据集的物理计划),但最终它只是给出了一个RDD(所以它不能是一个动作,因为 Spark操作返回非 RDD 值)。

正如您在代码中看到的:

  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    val deserialized = CatalystSerde.deserialize[T](logicalPlan) // <-- HERE see explanation below
    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }
Run Code Online (Sandbox Code Playgroud)

rdd 是惰性计算的,并且只计算一次。


评估数据框是否为空的一种方法是做 df.rdd.isEmpty()

我想知道你在哪里找到的。我只是

count(): Long返回数据集中的行数。

toRdd 懒惰值

如果您坚持使用相当低的级别来检查您的数据集是否为空,我宁愿使用它Dataset.queryExecution.toRdd。这几乎就像rdd没有这种额外的复制和应用模式一样。

df.queryExecution.toRdd.isEmpty
Run Code Online (Sandbox Code Playgroud)

比较以下 RDD 谱系并思考哪个看起来更好。

val dataset = spark.range(5).withColumn("group", 'id % 2)
scala> dataset.rdd.toDebugString
res1: String =
(8) MapPartitionsRDD[8] at rdd at <console>:26 [] // <-- extra deserialization step
 |  MapPartitionsRDD[7] at rdd at <console>:26 []
 |  MapPartitionsRDD[6] at rdd at <console>:26 []
 |  MapPartitionsRDD[5] at rdd at <console>:26 []
 |  ParallelCollectionRDD[4] at rdd at <console>:26 []

// Compare with a more memory-optimized alternative
// Avoids copies and has no schema
scala> dataset.queryExecution.toRdd.toDebugString
res2: String =
(8) MapPartitionsRDD[11] at toRdd at <console>:26 []
|  MapPartitionsRDD[10] at toRdd at <console>:26 []
|  ParallelCollectionRDD[9] at toRdd at <console>:26 []
Run Code Online (Sandbox Code Playgroud)

从 Spark 的角度来看,转换相当便宜,因为它们不会引起任何混洗,但考虑到计算之间的内存需求变化,我会使用后者(使用toRdd)。

rdd 懒惰值

rdd用 JVM 类型的行表示Dataset作为(延迟创建的)的内容。RDDT

rdd: RDD[T]
Run Code Online (Sandbox Code Playgroud)

正如您在源代码(粘贴在上面)中看到的那样,最后的请求rdd将触发一个额外的计算来获取 RDD。

  1. 创建一个新的逻辑计划来反序列化数据集的逻辑计划,即您从 JVM 外部管理的内部二进制行格​​式获得额外的反序列化,使其对应的表示为 JVM内部的JVM对象(想想您应该不惜一切代价避免的 GC)