当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?
val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)
根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.
如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?
我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.
spark = SparkSession.builder
.master("ip")
.enableHiveSupport()
.getOrCreate()
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)
当我将配置放入spark提交时,它工作正常
spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud) 我有大约5000行和950列的csv文件.首先我将它加载到DataFrame:
val data = sqlContext.read
.format(csvFormat)
.option("header", "true")
.option("inferSchema", "true")
.load(file)
.cache()
Run Code Online (Sandbox Code Playgroud)
之后我搜索所有字符串列
val featuresToIndex = data.schema
.filter(_.dataType == StringType)
.map(field => field.name)
Run Code Online (Sandbox Code Playgroud)
并想要索引它们.为此,我为每个字符串列创建索引器
val stringIndexers = featuresToIndex.map(colName =>
new StringIndexer()
.setInputCol(colName)
.setOutputCol(colName + "Indexed"))
Run Code Online (Sandbox Code Playgroud)
并创建管道
val pipeline = new Pipeline().setStages(stringIndexers.toArray)
Run Code Online (Sandbox Code Playgroud)
但是当我尝试用这个管道转换我的初始数据帧时
val indexedDf = pipeline.fit(data).transform(data)
Run Code Online (Sandbox Code Playgroud)
我得到StackOverflowError
16/07/05 16:55:12 INFO DAGScheduler: Job 4 finished: countByValue at StringIndexer.scala:86, took 7.882774 s
Exception in thread "main" java.lang.StackOverflowError
at scala.collection.immutable.Set$Set1.contains(Set.scala:84)
at scala.collection.immutable.Set$Set1.$plus(Set.scala:86)
at scala.collection.immutable.Set$Set1.$plus(Set.scala:81)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:20)
at scala.collection.generic.Growable$class.loop$1(Growable.scala:53)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)
at …Run Code Online (Sandbox Code Playgroud) 当我使用"++"来组合大量的RDD时,我得到了错误堆栈溢出错误.
Spark版本1.3.1环境:yarn-client.--driver-memory 8G
RDD的数量超过4000.每个RDD都从大小为1 GB的文本文件中读取.
它以这种方式生成
val collection = (for (
path <- files
) yield sc.textFile(path)).reduce(_ union _)
Run Code Online (Sandbox Code Playgroud)
files小尺寸时工作正常.而且有错误
错误重演.我猜这是一个被称为太多时间的递归函数?
Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
.....
Run Code Online (Sandbox Code Playgroud) I have a Spark Scala program which uses a REST API to get data batch by batch, and once all the data is retrieved I operate on them.
Current Program:
For each batch, create RDD and merge it with the previous RDD
created using the previous API call rdd.union(currentRdd).
Operate on final RDD
A simple program to reproduce the issue:
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
val sc = new SparkContext(conf)
val limit = …Run Code Online (Sandbox Code Playgroud)