如何在Spark中强制进行DataFrame评估

Rap*_*oth 15 scala apache-spark

有时(例如,测试和bechmarking)我想强制执行在DataFrame上定义的转换.调用类似动作的AFAIK count并不能确保所有Columns实际计算,show只能计算所有动作的子集Rows(参见下面的示例)

我的解决方案是使用写入DataFrameHDFS df.write.saveAsTable,但这会使我的系统"混乱"我不希望继续使用的表.

那么触发评估的最佳方法是DataFrame什么?

编辑:

请注意,最近还讨论了spark开发者列表:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-行td21018.html

我做了一个小例子,它表明counton DataFrame不会评估所有内容(使用Spark 1.6.3和spark-master =测试local[2]):

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception
Run Code Online (Sandbox Code Playgroud)

使用相同的逻辑,这里是一个show不评估所有行的示例:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception
Run Code Online (Sandbox Code Playgroud)

编辑2:对于Eliasah:例外情况说:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.
Run Code Online (Sandbox Code Playgroud)

Vin*_*Bdn 13

这是一个有点晚了,但这里的根本原因:count不采取同样的RDDDataFrame.

DataFrame那儿是一个优化,在某些情况下,你不需要将数据加载到真正知道它有元素的数量(尤其是在你的情况下没有数据搅乱参与).因此,DataFrame物化的时候count被调用将不会加载任何数据,也不会传递到您的异常抛出.您可以轻松定义自己做了实验DefaultSource,并Relation和看到呼吁countDataFrame将在方法最后总是buildScan没有requiredColumns不管你有多少列做选择(参见org.apache.spark.sql.sources.interfaces了解更多信息).这实际上是一种非常有效的优化;-)

RDD但是,在s中,没有这样的优化(这就是为什么人们应该总是尝试DataFrame在可能的情况下使用s).因此,counton RDD执行所有的lineage并返回组成任何分区的迭代器的所有大小的总和.

呼叫dataframe.count进入第一个解释,但拨打电话dataframe.rdd.count进入第二个像你一样打造RDD出你的DataFrame.请注意,当您需要Spark来缓存结果时,调用dataframe.cache().count强制实现dataframe(因此它需要加载所有数据并对其进行转换).但它确实有缓存数据的副作用......

  • 难道“缓存”不是一个建议来激发而不是强迫它实现吗? (2认同)

Sac*_*agi 12

我猜只是得到一个基本rddDataFrame和触发它的作用应该达到你在找什么.

df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions
Run Code Online (Sandbox Code Playgroud)

  • 这很奇怪.看起来这个答案适用于原始问题,但有人仍设法给出几个downvotes而不提任何理由或反馈.:) (5认同)