Ami*_*mit 6 hadoop scala bigdata apache-spark
spark 如何决定复制缓存分区的次数?
spark UI 上存储选项卡中的存储级别显示“磁盘序列化 1x 复制”,但看起来分区被复制到多个执行程序上。我们已经注意到这种情况发生在DISK_ONLY使用spark 2.3. 我们正在缓存一个数据集101 partitions(磁盘大小为 468.4 GB)。数据最初分布在 101 个执行者上(我们总共有 600 个执行者)。当我们在这个数据集上运行查询时,磁盘上的大小和分布的执行器数据的数量都会增加。我们还注意到,通常一个块/分区会被复制到同一节点上的多个执行器上——如果它存储在磁盘上,为什么不在同一节点上的执行器之间共享呢?
persistedDs = dataset.repartition(101).persist(StorageLevel.DISK_ONLY)
Run Code Online (Sandbox Code Playgroud)
小智 0
我参考了您在Spark-44900上提供的测试代码:
import spark.implicits._
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
// create a pseudo stream
val rddQueue = new mutable.Queue[RDD[Long]]()
val stream = ssc.queueStream(rddQueue, oneAtATime = true)
// create a simple lookup table
val lookup: DataFrame = sc.range(start = 0, end = 50000000, numSlices = 10)
.toDF("id")
.withColumn("value", md5(rand().cast(StringType)))
.cache()
// for every micro-batch perform value lookup via join
stream.foreachRDD { rdd =>
val df = rdd.toDF("id")
df.join(lookup, Seq("id"), "leftouter").count()
}
// run the streaming
ssc.start()
for (_ <- 1 to 1000000) {
rddQueue.synchronized {
val firstId = Random.nextInt(50000000)
rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4)
}
Thread.sleep(10)
}
ssc.stop()
Run Code Online (Sandbox Code Playgroud)
} }
并使用您提供的参数:
spark-submit --class Spark44900 --master yarn --deploy-mode client --executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=10
Run Code Online (Sandbox Code Playgroud)
以下是我从测试中获得的信息:
刷新前:
https://i.stack.imgur.com/eomTg.png
刷新后:
https://i.stack.imgur.com/ihCZW.png
对比UI提供的信息,确实可以明显看出数据量在增加。
这是我从日志中检索到的信息:
23/12/03 21:10:14 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 was not found
23/12/03 21:10:14 DEBUG storage.BlockManager: Getting remote block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 not found
23/12/03 21:10:38 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 310.2 MB)
23/12/03 21:10:38 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.LongColumnBuilder: Compressor for [id]: org.apache.spark.sql.execution.columnar.compression.LongDelta$Encoder@38e09408, ratio: 0.1251
23/12/03 21:10:38 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.StringColumnBuilder: Compressor for [value]: org.apache.spark.sql.execution.columnar.compression.PassThrough$Encoder@5d47e4a2, ratio: 1.0
23/12/03 21:10:38 DEBUG storage.BlockManager: Put block rdd_1260_1 locally took 24420 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Putting block rdd_1260_1 without replication took 24423 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:38 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:49 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 INFO storage.BlockManager: Found block rdd_1260_1 locally
23/12/03 21:11:52 INFO storage.BlockManager: Dropping block rdd_1260_1 from memory
23/12/03 21:11:52 DEBUG memory.MemoryStore: Block rdd_1260_1 of size 185234096 dropped from memory (free 207706349)
23/12/03 21:11:52 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:11:52 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:58 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 133.6 MB)
23/12/03 21:11:58 INFO storage.BlockManager: Found block rdd_1260_1 locally
Run Code Online (Sandbox Code Playgroud)
上述日志是专门针对rdd_1260_1的跟踪的。根据其行为,该分区中的数据似乎正在内存中复制(存储和清除)。磁盘缓存仅在初始查找时发生,在后续查找中,它直接命中本地缓存的块。
因此,根据这一观察,我推测这可能是 UI 显示的问题。实际上,该缓存并不是连续复制的。
下面是HDFS中的磁盘使用情况:
我用于测试的HDFS集群不包含额外的冗余数据。仅从数据量来看,实际使用量仅为5.8GB,与UI显示的Disk Size:222.9GB相差较大。基于此比较,我认为这可能是 Spark-metric 模块中的一个错误。不过具体原因我还没有深入探究。
以上代表了我目前的发现。我希望这与您一直面临的问题相符。如果我有任何误解,我将不胜感激您的澄清。谢谢。