Spark DataFrame 缓存不断增长

Ami*_*mit 6 hadoop scala bigdata apache-spark

spark 如何决定复制缓存分区的次数?

spark UI 上存储选项卡中的存储级别显示“磁盘序列化 1x 复制”,但看起来分区被复制到多个执行程序上。我们已经注意到这种情况发生在DISK_ONLY使用spark 2.3. 我们正在缓存一个数据集101 partitions(磁盘大小为 468.4 GB)。数据最初分布在 101 个执行者上(我们总共有 600 个执行者)。当我们在这个数据集上运行查询时,磁盘上的大小和分布的执行器数据的数量都会增加。我们还注意到,通常一个块/分区会被复制到同一节点上的多个执行器上——如果它存储在磁盘上,为什么不在同一节点上的执行器之间共享呢?

persistedDs = dataset.repartition(101).persist(StorageLevel.DISK_ONLY)
Run Code Online (Sandbox Code Playgroud)
  • 初始负载

    初始负载

  • 在缓存数据集上运行查询之后

    在此处输入图片说明

  • 一个 executor 可以缓存 2 个分区。另外,请注意,RDD 在附加的屏幕截图中被多次缓存。

    在此处输入图片说明

  • 101个Executor上的数据分布

    在此处输入图片说明

小智 0

我参考了您在Spark-44900上提供的测试代码:

import spark.implicits._

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
// create a pseudo stream
val rddQueue = new mutable.Queue[RDD[Long]]()
val stream = ssc.queueStream(rddQueue, oneAtATime = true)
// create a simple lookup table
val lookup: DataFrame = sc.range(start = 0, end = 50000000, numSlices = 10)
  .toDF("id")
  .withColumn("value", md5(rand().cast(StringType)))
  .cache()
// for every micro-batch perform value lookup via join
stream.foreachRDD { rdd =>
  val df = rdd.toDF("id")
  df.join(lookup, Seq("id"), "leftouter").count()
}
// run the streaming
ssc.start()
for (_ <- 1 to 1000000) {
  rddQueue.synchronized {
    val firstId = Random.nextInt(50000000)
    rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4)
  }
  Thread.sleep(10)
}
ssc.stop()
Run Code Online (Sandbox Code Playgroud)

} }

并使用您提供的参数:

spark-submit --class Spark44900 --master yarn --deploy-mode client --executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=10 
Run Code Online (Sandbox Code Playgroud)

以下是我从测试中获得的信息:

刷新前:

https://i.stack.imgur.com/eomTg.png

刷新后:

https://i.stack.imgur.com/ihCZW.png

对比UI提供的信息,确实可以明显看出数据量在增加。

这是我从日志中检索到的信息:

23/12/03 21:10:14 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 was not found
23/12/03 21:10:14 DEBUG storage.BlockManager: Getting remote block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 not found
23/12/03 21:10:38 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 310.2 MB)
23/12/03 21:10:38 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.LongColumnBuilder: Compressor for [id]: org.apache.spark.sql.execution.columnar.compression.LongDelta$Encoder@38e09408, ratio: 0.1251
23/12/03 21:10:38 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.StringColumnBuilder: Compressor for [value]: org.apache.spark.sql.execution.columnar.compression.PassThrough$Encoder@5d47e4a2, ratio: 1.0
23/12/03 21:10:38 DEBUG storage.BlockManager: Put block rdd_1260_1 locally took  24420 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Putting block rdd_1260_1 without replication took  24423 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:38 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:49 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 INFO storage.BlockManager: Found block rdd_1260_1 locally
23/12/03 21:11:52 INFO storage.BlockManager: Dropping block rdd_1260_1 from memory
23/12/03 21:11:52 DEBUG memory.MemoryStore: Block rdd_1260_1 of size 185234096 dropped from memory (free 207706349)
23/12/03 21:11:52 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:11:52 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:58 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 133.6 MB)
23/12/03 21:11:58 INFO storage.BlockManager: Found block rdd_1260_1 locally
Run Code Online (Sandbox Code Playgroud)

上述日志是专门针对rdd_1260_1的跟踪的。根据其行为,该分区中的数据似乎正在内存中复制(存储和清除)。磁盘缓存仅在初始查找时发生,在后续查找中,它直接命中本地缓存的块。

因此,根据这一观察,我推测这可能是 UI 显示的问题。实际上,该缓存并不是连续复制的。

下面是HDFS中的磁盘使用情况:

我用于测试的HDFS集群不包含额外的冗余数据。仅从数据量来看,实际使用量仅为5.8GB,与UI显示的Disk Size:222.9GB相差较大。基于此比较,我认为这可能是 Spark-metric 模块中的一个错误。不过具体原因我还没有深入探究。

以上代表了我目前的发现。我希望这与您一直面临的问题相符。如果我有任何误解,我将不胜感激您的澄清。谢谢。