相关疑难解决方法(0)

如何在Spark中强制进行DataFrame评估

有时(例如,测试和bechmarking)我想强制执行在DataFrame上定义的转换.调用类似动作的AFAIK count并不能确保所有Columns实际计算,show只能计算所有动作的子集Rows(参见下面的示例)

我的解决方案是使用写入DataFrameHDFS df.write.saveAsTable,但这会使我的系统"混乱"我不希望继续使用的表.

那么触发评估的最佳方法是DataFrame什么?

编辑:

请注意,最近还讨论了spark开发者列表:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-行td21018.html

我做了一个小例子,它表明counton DataFrame不会评估所有内容(使用Spark 1.6.3和spark-master =测试local[2]):

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception
Run Code Online (Sandbox Code Playgroud)

使用相同的逻辑,这里是一个show不评估所有行的示例:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception
Run Code Online (Sandbox Code Playgroud)

编辑2:对于Eliasah:例外情况说:

org.apache.spark.SparkException: Job …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

15
推荐指数
2
解决办法
6552
查看次数

如何知道哪个计数查询最快?

我一直在探索最近发布的Spark SQL 2.3.0-SNAPSHOT中的查询优化,并注意到语义相同查询的不同物理计划.

我们假设我必须计算以下数据集中的行数:

val q = spark.range(1)
Run Code Online (Sandbox Code Playgroud)

我可以按如下方式计算行数:

  1. q.count
  2. q.collect.size
  3. q.rdd.count
  4. q.queryExecution.toRdd.count

我最初的想法是它几乎是一个恒定的操作(肯定是由于本地数据集),它会以某种方式由Spark SQL优化并立即给出结果,尤其是.Spark SQL完全控制查询执行的第一个.

看了查询的物理计划后,我就相信最有效的查询将是最后一个:

q.queryExecution.toRdd.count
Run Code Online (Sandbox Code Playgroud)

原因是:

  1. 它避免了从InternalRow二进制格式反序列化行
  2. 查询是代码
  3. 一个阶段只有一个工作

物理计划就这么简单.

工作细节

我的推理是否正确?如果是这样,如果我从外部数据源(例如文件,JDBC,Kafka)读取数据集,答案会有所不同吗?

主要问题是,要考虑查询是否比其他查询更有效(在此示例中),需要考虑哪些因素?


其他执行计划是完整性的.

q.count

q.count

q.collect.size

q.collect.size

q.rdd.count

q.rdd.count

performance query-optimization apache-spark apache-spark-sql

7
推荐指数
1
解决办法
4296
查看次数