相关疑难解决方法(0)

(为什么)我们需要在RDD上调用缓存或持久化

当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?

val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)

根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.

如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?

scala apache-spark rdd

161
推荐指数
5
解决办法
7万
查看次数

Spark cache vs broadcast

看起来广播方法在我的集群中生成RDD的分布式副本.另一方面,cache()方法的执行只是将数据加载到内存中.

但我不明白缓存的RDD是如何在集群中分布的.

你能告诉我在哪些情况下我应该使用rdd.cache()rdd.broadcast()方法吗?

caching apache-spark

20
推荐指数
3
解决办法
1万
查看次数

缓存()/ persist()的apache-spark内存消耗

当我尝试缓存()或持久化(MEMORY_ONLY_SER())我的RDD时,我的spark簇会挂起.它工作得很好,并在大约7分钟内计算结果.如果我不使用cache().

我有6个c3.xlarge EC2实例(4个内核,每个7.5 GB RAM),共有24个内核和37.7 GB.

我在master上使用以下命令运行我的应用程序:

SPARK_MEM = 5g MEMORY_FRACTION ="0.6"SPARK_HOME ="/ root/spark"java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-孵化-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis

数据集大约有50GB的数据被分成24个文件.我将其压缩并存储在24个存储桶中的24个文件中(每个文件大小为7MB到300MB).

我绝对找不到我的集群这种行为的原因,但似乎火花消耗了所有可用的内存并进入了GC收集循环.当我查看gc verbose时,我可以找到如下的循环:

[GC 5208198K(5208832K), 0,2403780 secs]
[Full GC 5208831K->5208212K(5208832K), 9,8765730 secs]
[Full GC 5208829K->5208238K(5208832K), 9,7567820 secs]
[Full GC 5208829K->5208295K(5208832K), 9,7629460 secs]
[GC 5208301K(5208832K), 0,2403480 secs]
[Full GC 5208831K->5208344K(5208832K), 9,7497710 secs]
[Full GC 5208829K->5208366K(5208832K), 9,7542880 secs]
[Full GC 5208831K->5208415K(5208832K), 9,7574860 secs]
Run Code Online (Sandbox Code Playgroud)

这最终会导致如下消息:

WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-xx-xx-xxx-xxx.eu-west-1.compute.internal, 60048, 0) with no recent heart beats: 64828ms exceeds 45000ms
Run Code Online (Sandbox Code Playgroud)

...并停止计算方面的任何进展.这看起来像100%消耗的内存,但我试图使用具有更多RAM(每个30GB)的机器,效果是相同的.

这种行为可能是什么原因?有人可以帮忙吗?

java garbage-collection apache-spark

3
推荐指数
1
解决办法
6696
查看次数

当rdd项很大时,为什么rdd.map(identity).cache会变慢?

我发现当.map( identity ).cache在rdd上使用时,如果物品很大,它会变得很慢.虽然它几乎是瞬间的.

注意:这可能与这个问题有关,但在这里我提供了一个非常精确的例子(可以直接在spark-shell中执行):

// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k …
Run Code Online (Sandbox Code Playgroud)

performance caching apache-spark

2
推荐指数
1
解决办法
2126
查看次数