为什么在Spark数据集上调用缓存需要很长时间?

Jos*_*der 6 caching scala dataset apache-spark

我正在加载大型数据集,然后在整个代码中缓存它们以供参考.代码看起来像这样:

val conversations = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("forward_spark_s3_credentials","true")
  .option("query", "SELECT * FROM my_table "+
                   "WHERE date <= '2017-06-03' "+
                   "AND date >= '2017-03-06' ")
  .load()
  .cache()
Run Code Online (Sandbox Code Playgroud)

如果我不使用缓存,代码会快速执行,因为数据集会被懒惰地评估.但是,如果我放入缓存(),该块需要很长时间才能运行.

从在线Spark UI的事件时间轴,似乎SQL表被传输到工作节点,然后缓存在工作节点上.

为什么缓存会立即执行?源代码似乎仅在计算数据时将其标记为缓存:

调用缓存或持久性时,数据集源代码调用CacheManager.scala中的此代码:

  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      cachedData.add(CachedData(
        planToCache,
        InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize,
          storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan,
          tableName)))
    }
  }
Run Code Online (Sandbox Code Playgroud)

这似乎只标记缓存而不是实际缓存数据.我希望缓存能够立即根据Stack Overflow上的其他答案返回.

有没有其他人在对数据集执行操作之前立即看到缓存?为什么会这样?

Jos*_*der 0

我现在相信,正如Erik van Oosten 回答的那样,cache() 命令会导致查询执行。

仔细观察我的OP中的代码确实表明该命令正在被缓存。我认为缓存发生在两个关键行:

cachedData.add(CachedData(...))
Run Code Online (Sandbox Code Playgroud)

此行创建一个新的 CachedData 对象,该对象被添加到某种类型的 cachedData 集合中。虽然缓存数据对象可能是稍后保存缓存数据的占位符,但 CachedData 对象似乎更可能真正保存缓存数据。

更重要的是,这一行:

sparkSession.sessionState.executePlan(planToCache).executedPlan
Run Code Online (Sandbox Code Playgroud)

看来确实在执行计划。因此,根据我的经验,Erik van Oosten 对这里发生的事情的直觉以及源代码,我相信调用 cache() 会导致 Spark 数据集的计划被执行。