Why SPARK repeat transformations after persist operations?

jk1*_*jk1 1 scala persist directed-acyclic-graphs apache-spark

I have next code. I am doing count to perform persist operation and fix transformations above. But I noticed that DAG and stages for 2 different count Jobs calls first persist twice (when I expect second persist method to be called in second count call)

val df = sparkSession.read
      .parquet(bigData)
      .filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
      .as[SafegraphRawData]
      // So repartition here to be able perform shuffle operations later
      // another transformations and minor filtration
      .repartition(nrInputPartitions)
      // Firstly persist here since objects not fit in memory (Persist 67)
      .persist(StorageLevel.MEMORY_AND_DISK)

    LOG.info(s"First count  = " + df.count)

    val filter: BaseFilter = new BaseFilter()

    LOG.info(s"Number of partitions: " + df.rdd.getNumPartitions)
    val rddPoints= df
      .map(parse)
      .filter(filter.IsValid(_, deviceStageMetricService, providerdevicelist, sparkSession))
      .map(convert)
    // Since we will perform count and partitionBy actions, compute all above transformations/ Second persist 
    val dsPoints = rddPoints.persist(StorageLevel.MEMORY_AND_DISK)
    val totalPoints = dsPoints.count()
    LOG.info(s"Second count  = $totalPoints")
Run Code Online (Sandbox Code Playgroud)

第一次计数 LOG.info(s"First count = " + df.count)

LOG.info(s"第二次计数 = $totalPoints")

Avi*_*rya 5

当您说 StorageLevel.MEMORY_AND_DISK 时,spark 会尝试将所有数据放入内存中,如果不合适,则会溢出到磁盘。

现在你在这里做了多个持久化。在 spark 中,内存缓存是 LRU,所以后面的持久化会覆盖之前缓存的数据。

即使您指定StorageLevel.MEMORY_AND_DISK数据何时被另一个缓存数据从缓存中逐出,spark 也不会将其溢出到磁盘。因此,当您进行下一次计数时,它需要重新评估 DAG,以便它可以检索缓存中不存在的分区。

我建议您使用 StorageLevel.DISK_ONLY 来避免这种重新计算。