Apache Spark 警告 MemoryStore:空间不足

Ale*_*tov 5 apache-spark

我使用 Sparklung Water,并且正在从镶木地板文件中读取数据。

我的spark-default.conf的部分:

`spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.max 1g 
spark.driver.memory 40g 
spark.executor.memory 40g 
spark.driver.maxResultSize 0 
spark.python.worker.memory 30g 
spark.executor.extraJavaOptions -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
spark.storage.safetyFraction 0.9 
spark.storage.memoryFraction 0.0
Run Code Online (Sandbox Code Playgroud)

`

15/11/26 11:44:46 WARN MemoryStore: Not enough space to cache rdd_7_472 in memory! (computed 3.2 MB so far) 
15/11/26 11:44:46 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
15/11/26 11:44:46 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
15/11/26 11:44:46 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_7_474 in memory. 
15/11/26 11:44:46 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_7_475 in memory.
Run Code Online (Sandbox Code Playgroud)

实际上,Spark只使用了它可以使用的一部分内存,并且在分配内存方面存在很多错误。Spark 开始将数据写入硬盘而不是使用 RAM。为什么会这样呢?也许我应该更改conf文件中的某些内容?如何更改 Java 用作“tmp”的目录?

谢谢你!

ton*_*n_K 0

Spark 开始将数据写入硬盘而不是使用 RAM。为什么会这样呢?

这应该是因为您的持久性设置被配置为使用选项MEMORY_AND_DISK

从文档 - > https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence 从源代码 - > https://github.com/apache/spark/blob /master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala

private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
classTag: ClassTag[T]) extends MemoryEntry[T] {
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
Run Code Online (Sandbox Code Playgroud)

和这一点

// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
conf.get(STORAGE_UNROLL_MEMORY_THRESHOLD)
Run Code Online (Sandbox Code Playgroud)

再往下你会发现这一点

// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L 
Run Code Online (Sandbox Code Playgroud)

这就是您看到的错误的来源

    // Request enough memory to begin unrolling
keepUnrolling =
  reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

if (!keepUnrolling) {
  logWarning(s"Failed to reserve initial memory threshold of " +
    s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
  unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
Run Code Online (Sandbox Code Playgroud)

因此,要么在应用程序级别启用 OFF_HEAP,就像本博客中所做的那样 --> https://www.waitingforcode.com/apache-spark/apache-spark-off-heap-memory/read 或者调整集群/机器配置并按照此处所述启用此设置 --> https://spark.apache.org/docs/latest/configuration.html#memory-management

最后,如果上述方法都没有帮助,在我的例子中,重新启动节点就可以消除警告。