有时(例如,测试和bechmarking)我想强制执行在DataFrame上定义的转换.调用类似动作的AFAIK count并不能确保所有Columns实际计算,show只能计算所有动作的子集Rows(参见下面的示例)
我的解决方案是使用写入DataFrameHDFS df.write.saveAsTable,但这会使我的系统"混乱"我不希望继续使用的表.
那么触发评估的最佳方法是DataFrame什么?
编辑:
请注意,最近还讨论了spark开发者列表:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-行td21018.html
我做了一个小例子,它表明counton DataFrame不会评估所有内容(使用Spark 1.6.3和spark-master =测试local[2]):
val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})
df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception
Run Code Online (Sandbox Code Playgroud)
使用相同的逻辑,这里是一个show不评估所有行的示例:
val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})
df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception
Run Code Online (Sandbox Code Playgroud)
编辑2:对于Eliasah:例外情况说:
org.apache.spark.SparkException: Job …Run Code Online (Sandbox Code Playgroud) 我一直在探索最近发布的Spark SQL 2.3.0-SNAPSHOT中的查询优化,并注意到语义相同查询的不同物理计划.
我们假设我必须计算以下数据集中的行数:
val q = spark.range(1)
Run Code Online (Sandbox Code Playgroud)
我可以按如下方式计算行数:
q.countq.collect.sizeq.rdd.countq.queryExecution.toRdd.count我最初的想法是它几乎是一个恒定的操作(肯定是由于本地数据集),它会以某种方式由Spark SQL优化并立即给出结果,尤其是.Spark SQL完全控制查询执行的第一个.
看了查询的物理计划后,我就相信最有效的查询将是最后一个:
q.queryExecution.toRdd.count
Run Code Online (Sandbox Code Playgroud)
原因是:
InternalRow二进制格式反序列化行物理计划就这么简单.
我的推理是否正确?如果是这样,如果我从外部数据源(例如文件,JDBC,Kafka)读取数据集,答案会有所不同吗?
主要问题是,要考虑查询是否比其他查询更有效(在此示例中),需要考虑哪些因素?
其他执行计划是完整性的.
performance query-optimization apache-spark apache-spark-sql