持久/缓存RDD上的Spark RDD检查点正在执行两次DAG

Gle*_*ker 6 caching persist checkpoint apache-spark rdd

当我运行如下代码时:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
Run Code Online (Sandbox Code Playgroud)

并且观察Yarn中的阶段,我注意到Spark正在进行DAG计算TWICE - 一次用于实现RDD并将其缓存的distinct + count,然后是完全第二次创建检查点副本.

由于RDD已经实现并缓存,为什么检查点不能简单地利用这一点,并将缓存的分区保存到磁盘?

是否存在一种现有方式(某种配置设置或代码更改)以强制Spark利用此功能并仅运行ONCE操作,并且检查点只会复制内容?

我需要两次"实现"吗?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())
Run Code Online (Sandbox Code Playgroud)

我已经创建了一个Apache Spark Jira票证,以使其成为一项功能请求:https: //issues.apache.org/jira/browse/SPARK-8666

Gle*_*ker 6

看起来这可能是一个已知问题.查看较旧的JIRA门票,https://issues.apache.org/jira/browse/SPARK-8582


Dav*_*von 5

这是一个老问题了。但这也影响了我,所以我做了一些挖掘。我在 jira 和 github 的变更跟踪历史记录中发现了一堆非常无用的搜索结果。这些搜索结果包含开发人员关于他们提议的编程更改的大量技术言论。这对我来说并没有提供太多信息,我建议限制你花在看它上的时间。

我能找到的关于此事的最清晰的信息在这里: https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

需要设置检查点的 RDD 将被计算两次;因此建议在 rdd.checkpoint() 之前执行 rdd.cache()

鉴于OP实际上确实使用了持久化和检查点,他可能走在正确的轨道上。我怀疑唯一的问题是他调用检查点的方式。我对 Spark 还很陌生,但我认为他应该这样做:

newRDD = newRDD.检查点

希望这一点是清楚的。根据我的测试,这消除了我的一个数据帧的冗余重新计算。