如何检查DataFrames?

Dan*_*lds 26 scala dataframe apache-spark apache-spark-sql

我正在寻找一种检查点DataFrames的方法.Checkpoint目前是RDD上的一个操作,但我找不到如何使用DataFrames.持久化和缓存(它们是彼此的同义词)可用于DataFrame但它们不会"破坏谱系",因此不适用于可循环数百(或数千)次迭代的方法.

例如,假设我有一个签名为DataFrame => DataFrame的函数列表.即使myfunctions有数百或数千个条目,我想有办法计算以下内容:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df, f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }
Run Code Online (Sandbox Code Playgroud)

Dan*_*ula 21

TL; DR:对于高达1.6的Spark版本,要实际获得"检查点DF",我建议的解决方案基于另一个答案,但有一个额外的行:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed
Run Code Online (Sandbox Code Playgroud)

说明

进一步研究后更新.

正如所指出的,直接检查DataFrame当前不是(Spark 1.6.1),尽管Spark的Jira 存在问题.

因此,可能的解决方法是另一个答案建议:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint
Run Code Online (Sandbox Code Playgroud)

但是,使用此方法,只会检查df.rdd对象.这可以通过调用验证toDebugStringdf.rdd:

 scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []
Run Code Online (Sandbox Code Playgroud)

然后toDebugString在快速转换后调用df(请注意我从JDBC源创建了我的DataFrame),返回以下内容:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []
Run Code Online (Sandbox Code Playgroud)

df.explain 还显示了一个提示:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)
Run Code Online (Sandbox Code Playgroud)

因此,要实际实现"检查点"DataFrame,我只能考虑从检查点RDD创建一个新的:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")
Run Code Online (Sandbox Code Playgroud)

然后我们可以验证新的DataFrame是"检查点":

1)newDF.explain:

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]
Run Code Online (Sandbox Code Playgroud)

2)newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []
Run Code Online (Sandbox Code Playgroud)

3)随着转型:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []
Run Code Online (Sandbox Code Playgroud)

此外,我尝试了一些更复杂的转换,并且我能够在实践newDF中检查对象是否被检查点.

因此,我发现可靠地检查DataFrame的唯一方法是通过检查点关联的RDD并从中创建一个新的DataFrame对象.

我希望它有所帮助.干杯.


Ass*_*son 8

从spark 2.1开始,数据帧有一个检查点方法(参见http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset)你可以直接使用,没有需要通过RDD.


mrs*_*vas 6

扩展到 Assaf Mendelson 的回答,

从今天的 Spark 2.2 版开始,DataSet#checkpoint()API 正在不断发展和实验

用法:

在必须使用 SparkContext 提及检查点 CheckpointDir 之前

spark.sparkContext.setCheckpointDir("checkpoint/dir/location")

val ds: Dataset[Long] = spark.range(10).repartition('id % 2)

// do checkpoint now, it will preserve partition also
val cp: Dataset[Long] = ds.checkpoint()
Run Code Online (Sandbox Code Playgroud)

内部运作如何?

到目前为止,DataSet 检查点实现是将 DataSet 转换为 RDD,然后对其进行检查点。

// In DataSet.scala 

//API we used in example  
def checkpoint(): Dataset[T] = checkpoint(eager = true)

//Base implementation
def checkpoint(eager: Boolean): Dataset[T] = {
    val internalRdd = queryExecution.toRdd.map(_.copy())
    internalRdd.checkpoint()

    if (eager) {
      internalRdd.count() //To materialize DataSet immediately on checkpoint() call
    }

  ...
}
Run Code Online (Sandbox Code Playgroud)


abe*_*bop 6

最初的问题是关于 Scala Spark,但我认为添加 PySpark 语法也很有用,这非常相似。请注意,与cache/不同persistcheckpoint它不会就地运行(这最初让我感到困惑):

spark.sparkContext.setCheckpointDir("/foo/bar")
df = df.checkpoint()
Run Code Online (Sandbox Code Playgroud)


Jus*_*ony 4

我想现在你必须做

sc.setCheckpointDir("/DIR")
df.rdd.checkpoint
Run Code Online (Sandbox Code Playgroud)

然后你必须对底层执行你的操作df.rdddf.ACTION目前无法通话,只能df.rdd.ACTION