我使用 Sparklung Water,并且正在从镶木地板文件中读取数据。
我的spark-default.conf的部分:
`spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.max 1g
spark.driver.memory 40g
spark.executor.memory 40g
spark.driver.maxResultSize 0
spark.python.worker.memory 30g
spark.executor.extraJavaOptions -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution
spark.storage.safetyFraction 0.9
spark.storage.memoryFraction 0.0
Run Code Online (Sandbox Code Playgroud)
`
15/11/26 11:44:46 WARN MemoryStore: Not enough space to cache rdd_7_472 in memory! (computed 3.2 MB so far)
15/11/26 11:44:46 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
15/11/26 11:44:46 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
15/11/26 11:44:46 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_7_474 in memory.
15/11/26 11:44:46 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_7_475 in memory.
Run Code Online (Sandbox Code Playgroud)
实际上,Spark只使用了它可以使用的一部分内存,并且在分配内存方面存在很多错误。Spark 开始将数据写入硬盘而不是使用 RAM。为什么会这样呢?也许我应该更改conf文件中的某些内容?如何更改 Java 用作“tmp”的目录?
谢谢你!
Spark 开始将数据写入硬盘而不是使用 RAM。为什么会这样呢?
这应该是因为您的持久性设置被配置为使用选项MEMORY_AND_DISK。
从文档 - > https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence 从源代码 - > https://github.com/apache/spark/blob /master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
classTag: ClassTag[T]) extends MemoryEntry[T] {
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
Run Code Online (Sandbox Code Playgroud)
和这一点
// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
conf.get(STORAGE_UNROLL_MEMORY_THRESHOLD)
Run Code Online (Sandbox Code Playgroud)
再往下你会发现这一点
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
Run Code Online (Sandbox Code Playgroud)
这就是您看到的错误的来源
// Request enough memory to begin unrolling
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
Run Code Online (Sandbox Code Playgroud)
因此,要么在应用程序级别启用 OFF_HEAP,就像本博客中所做的那样 --> https://www.waitingforcode.com/apache-spark/apache-spark-off-heap-memory/read 或者调整集群/机器配置并按照此处所述启用此设置 --> https://spark.apache.org/docs/latest/configuration.html#memory-management
最后,如果上述方法都没有帮助,在我的例子中,重新启动节点就可以消除警告。
| 归档时间: |
|
| 查看次数: |
5931 次 |
| 最近记录: |