Dan*_*lds 26 scala dataframe apache-spark apache-spark-sql
我正在寻找一种检查点DataFrames的方法.Checkpoint目前是RDD上的一个操作,但我找不到如何使用DataFrames.持久化和缓存(它们是彼此的同义词)可用于DataFrame但它们不会"破坏谱系",因此不适用于可循环数百(或数千)次迭代的方法.
例如,假设我有一个签名为DataFrame => DataFrame的函数列表.即使myfunctions有数百或数千个条目,我想有办法计算以下内容:
def foo(dataset: DataFrame, g: DataFrame => Unit) =
myfunctions.foldLeft(dataset) {
case (df, f) =>
val nextDF = f(df)
g(nextDF)
nextDF
}
Run Code Online (Sandbox Code Playgroud)
Dan*_*ula 21
TL; DR:对于高达1.6的Spark版本,要实际获得"检查点DF",我建议的解决方案基于另一个答案,但有一个额外的行:
df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed
Run Code Online (Sandbox Code Playgroud)
说明
进一步研究后更新.
正如所指出的,直接检查DataFrame当前不是(Spark 1.6.1),尽管Spark的Jira 存在问题.
因此,可能的解决方法是另一个答案建议:
df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint
Run Code Online (Sandbox Code Playgroud)
但是,使用此方法,只会检查df.rdd对象.这可以通过调用验证toDebugString到df.rdd:
scala> df.rdd.toDebugString
(32) MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []
Run Code Online (Sandbox Code Playgroud)
然后toDebugString在快速转换后调用df(请注意我从JDBC源创建了我的DataFrame),返回以下内容:
scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
| MapPartitionsRDD[4] at rdd at <console>:38 []
| JDBCRDD[3] at rdd at <console>:38 []
Run Code Online (Sandbox Code Playgroud)
df.explain 还显示了一个提示:
scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)
Run Code Online (Sandbox Code Playgroud)
因此,要实际实现"检查点"DataFrame,我只能考虑从检查点RDD创建一个新的:
val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map {
case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")
Run Code Online (Sandbox Code Playgroud)
然后我们可以验证新的DataFrame是"检查点":
1)newDF.explain:
scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]
Run Code Online (Sandbox Code Playgroud)
2)newDF.rdd.toDebugString:
scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
| MapPartitionsRDD[8] at createDataFrame at <console>:37 []
| MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []
Run Code Online (Sandbox Code Playgroud)
3)随着转型:
scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
| MapPartitionsRDD[11] at rdd at <console>:40 []
| MapPartitionsRDD[8] at createDataFrame at <console>:37 []
| MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []
Run Code Online (Sandbox Code Playgroud)
此外,我尝试了一些更复杂的转换,并且我能够在实践newDF中检查对象是否被检查点.
因此,我发现可靠地检查DataFrame的唯一方法是通过检查点关联的RDD并从中创建一个新的DataFrame对象.
我希望它有所帮助.干杯.
从spark 2.1开始,数据帧有一个检查点方法(参见http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset)你可以直接使用,没有需要通过RDD.
扩展到 Assaf Mendelson 的回答,
从今天的 Spark 2.2 版开始,DataSet#checkpoint()API 正在不断发展和实验
在必须使用 SparkContext 提及检查点 CheckpointDir 之前
spark.sparkContext.setCheckpointDir("checkpoint/dir/location")
val ds: Dataset[Long] = spark.range(10).repartition('id % 2)
// do checkpoint now, it will preserve partition also
val cp: Dataset[Long] = ds.checkpoint()
Run Code Online (Sandbox Code Playgroud)
到目前为止,DataSet 检查点的实现是将 DataSet 转换为 RDD,然后对其进行检查点。
// In DataSet.scala
//API we used in example
def checkpoint(): Dataset[T] = checkpoint(eager = true)
//Base implementation
def checkpoint(eager: Boolean): Dataset[T] = {
val internalRdd = queryExecution.toRdd.map(_.copy())
internalRdd.checkpoint()
if (eager) {
internalRdd.count() //To materialize DataSet immediately on checkpoint() call
}
...
}
Run Code Online (Sandbox Code Playgroud)
最初的问题是关于 Scala Spark,但我认为添加 PySpark 语法也很有用,这非常相似。请注意,与cache/不同persist,checkpoint它不会就地运行(这最初让我感到困惑):
spark.sparkContext.setCheckpointDir("/foo/bar")
df = df.checkpoint()
Run Code Online (Sandbox Code Playgroud)
我想现在你必须做
sc.setCheckpointDir("/DIR")
df.rdd.checkpoint
Run Code Online (Sandbox Code Playgroud)
然后你必须对底层执行你的操作df.rdd。df.ACTION目前无法通话,只能df.rdd.ACTION
| 归档时间: |
|
| 查看次数: |
9721 次 |
| 最近记录: |