jk1*_*jk1 1 scala persist directed-acyclic-graphs apache-spark
I have next code. I am doing count to perform persist operation and fix transformations above. But I noticed that DAG and stages for 2 different count Jobs calls first persist twice (when I expect second persist method to be called in second count call)
val df = sparkSession.read
.parquet(bigData)
.filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
.as[SafegraphRawData]
// So repartition here to be able perform shuffle operations later
// another transformations and minor filtration
.repartition(nrInputPartitions)
// Firstly persist here since objects not fit in memory (Persist 67)
.persist(StorageLevel.MEMORY_AND_DISK)
LOG.info(s"First count = " + df.count)
val filter: BaseFilter = new BaseFilter()
LOG.info(s"Number of partitions: " + df.rdd.getNumPartitions)
val rddPoints= df
.map(parse)
.filter(filter.IsValid(_, deviceStageMetricService, providerdevicelist, sparkSession))
.map(convert)
// Since we will perform count and partitionBy actions, compute all above transformations/ Second persist
val dsPoints = rddPoints.persist(StorageLevel.MEMORY_AND_DISK)
val totalPoints = dsPoints.count()
LOG.info(s"Second count = $totalPoints")
Run Code Online (Sandbox Code Playgroud)
当您说 StorageLevel.MEMORY_AND_DISK 时,spark 会尝试将所有数据放入内存中,如果不合适,则会溢出到磁盘。
现在你在这里做了多个持久化。在 spark 中,内存缓存是 LRU,所以后面的持久化会覆盖之前缓存的数据。
即使您指定StorageLevel.MEMORY_AND_DISK数据何时被另一个缓存数据从缓存中逐出,spark 也不会将其溢出到磁盘。因此,当您进行下一次计数时,它需要重新评估 DAG,以便它可以检索缓存中不存在的分区。
我建议您使用 StorageLevel.DISK_ONLY 来避免这种重新计算。
| 归档时间: |
|
| 查看次数: |
1178 次 |
| 最近记录: |