当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?
val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)
根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.
如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?
看起来广播方法在我的集群中生成RDD的分布式副本.另一方面,cache()方法的执行只是将数据加载到内存中.
但我不明白缓存的RDD是如何在集群中分布的.
你能告诉我在哪些情况下我应该使用rdd.cache()和rdd.broadcast()方法吗?
当我尝试缓存()或持久化(MEMORY_ONLY_SER())我的RDD时,我的spark簇会挂起.它工作得很好,并在大约7分钟内计算结果.如果我不使用cache().
我有6个c3.xlarge EC2实例(4个内核,每个7.5 GB RAM),共有24个内核和37.7 GB.
我在master上使用以下命令运行我的应用程序:
SPARK_MEM = 5g MEMORY_FRACTION ="0.6"SPARK_HOME ="/ root/spark"java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-孵化-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis
数据集大约有50GB的数据被分成24个文件.我将其压缩并存储在24个存储桶中的24个文件中(每个文件大小为7MB到300MB).
我绝对找不到我的集群这种行为的原因,但似乎火花消耗了所有可用的内存并进入了GC收集循环.当我查看gc verbose时,我可以找到如下的循环:
[GC 5208198K(5208832K), 0,2403780 secs]
[Full GC 5208831K->5208212K(5208832K), 9,8765730 secs]
[Full GC 5208829K->5208238K(5208832K), 9,7567820 secs]
[Full GC 5208829K->5208295K(5208832K), 9,7629460 secs]
[GC 5208301K(5208832K), 0,2403480 secs]
[Full GC 5208831K->5208344K(5208832K), 9,7497710 secs]
[Full GC 5208829K->5208366K(5208832K), 9,7542880 secs]
[Full GC 5208831K->5208415K(5208832K), 9,7574860 secs]
Run Code Online (Sandbox Code Playgroud)
这最终会导致如下消息:
WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-xx-xx-xxx-xxx.eu-west-1.compute.internal, 60048, 0) with no recent heart beats: 64828ms exceeds 45000ms
Run Code Online (Sandbox Code Playgroud)
...并停止计算方面的任何进展.这看起来像100%消耗的内存,但我试图使用具有更多RAM(每个30GB)的机器,效果是相同的.
这种行为可能是什么原因?有人可以帮忙吗?
我发现当.map( identity ).cache在rdd上使用时,如果物品很大,它会变得很慢.虽然它几乎是瞬间的.
注意:这可能与这个问题有关,但在这里我提供了一个非常精确的例子(可以直接在spark-shell中执行):
// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
val t = System.nanoTime
val out = code
println(s"time = ${(System.nanoTime - t)/1000000}ms")
out
}
// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )
// create rdd
val n = 1000 // size of the rdd
val rdd = sc.parallelize(1 to n).map( k …Run Code Online (Sandbox Code Playgroud)