相关疑难解决方法(0)

为什么Spark DataFrame转换为RDD需要完全重新映射?

来自Spark的源代码:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }
Run Code Online (Sandbox Code Playgroud)

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972

mapPartitions只要可以作为计算时间RDD摆在首位.所以这使得诸如操作

df.rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)

非常贵.考虑到一个 DataFrameDataSet[Row]和一个DataSetRDD就是为什么需要重新映射?任何见解都表示赞赏.

scala apache-spark

10
推荐指数
1
解决办法
337
查看次数

为什么sortBy转换会触发Spark作业?

根据Spark文档,只有RDD操作可以触发Spark作业,并且在对其调用操作时会对延迟进行转换评估.

我看到sortBy转换函数立即应用,它在SparkUI中显示为作业触发器.为什么?

partitioning partitioner apache-spark rdd

9
推荐指数
2
解决办法
1380
查看次数

排序后的数据帧分区数?

spark如何确定使用后的分区数orderBy?我一直认为生成的数据框有spark.sql.shuffle.partitions,但这似乎不是真的:

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,spark 都可以+- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200),那么第二种情况下的分区数怎么会是 2?

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1956
查看次数

在spark中将dataframe转换为rdd的成本

我正在尝试使用以下方法获取数据帧的分区数:

df.rdd.getNumPartitions.toString
Run Code Online (Sandbox Code Playgroud)

但是当我监控 Spark 日志时,我发现它会启动许多阶段,并且是一项成本高昂的操作。 在此输入图像描述

根据我的理解,dataframe通过元数据为rdd添加了一个结构层。那么,为什么在转换为 rdd 时剥离它需要这么多时间呢?

apache-spark rdd apache-spark-sql

3
推荐指数
1
解决办法
1795
查看次数