相关疑难解决方法(0)

(为什么)我们需要在RDD上调用缓存或持久化

当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?

val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)

根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.

如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?

scala apache-spark rdd

161
推荐指数
5
解决办法
7万
查看次数

spark 2.1.0 session config settings(pyspark)

我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.

 spark  = SparkSession.builder
                      .master("ip")
                      .enableHiveSupport()
                      .getOrCreate()

 spark.conf.set("spark.executor.memory", '8g')
 spark.conf.set('spark.executor.cores', '3')
 spark.conf.set('spark.cores.max', '3')
 spark.conf.set("spark.driver.memory",'8g')
 sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)

当我将配置放入spark提交时,它工作正常

spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

31
推荐指数
2
解决办法
4万
查看次数

Apache Spark:尝试索引字符串列时出现StackOverflowError

我有大约5000行和950列的csv文件.首先我将它加载到DataFrame:

val data = sqlContext.read
  .format(csvFormat)
  .option("header", "true")
  .option("inferSchema", "true")
  .load(file)
  .cache()
Run Code Online (Sandbox Code Playgroud)

之后我搜索所有字符串列

val featuresToIndex = data.schema
  .filter(_.dataType == StringType)
  .map(field => field.name)
Run Code Online (Sandbox Code Playgroud)

并想要索引它们.为此,我为每个字符串列创建索引器

val stringIndexers = featuresToIndex.map(colName =>
  new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName + "Indexed"))
Run Code Online (Sandbox Code Playgroud)

并创建管道

val pipeline = new Pipeline().setStages(stringIndexers.toArray)
Run Code Online (Sandbox Code Playgroud)

但是当我尝试用这个管道转换我的初始数据帧时

val indexedDf = pipeline.fit(data).transform(data)
Run Code Online (Sandbox Code Playgroud)

我得到StackOverflowError

16/07/05 16:55:12 INFO DAGScheduler: Job 4 finished: countByValue at StringIndexer.scala:86, took 7.882774 s
Exception in thread "main" java.lang.StackOverflowError
at scala.collection.immutable.Set$Set1.contains(Set.scala:84)
at scala.collection.immutable.Set$Set1.$plus(Set.scala:86)
at scala.collection.immutable.Set$Set1.$plus(Set.scala:81)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:20)
at scala.collection.generic.Growable$class.loop$1(Growable.scala:53)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)
at …
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark apache-spark-mllib

18
推荐指数
2
解决办法
3875
查看次数

联合时很多RDD抛出堆栈溢出错误

当我使用"++"来组合大量的RDD时,我得到了错误堆栈溢出错误.

Spark版本1.3.1环境:yarn-client.--driver-memory 8G

RDD的数量超过4000.每个RDD都从大小为1 GB的文本文件中读取.

它以这种方式生成

val collection = (for (
  path <- files
) yield sc.textFile(path)).reduce(_ union _)
Run Code Online (Sandbox Code Playgroud)

files小尺寸时工作正常.而且有错误

错误重演.我猜这是一个被称为太多时间的递归函数?

 Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
  .....
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd

15
推荐指数
1
解决办法
6491
查看次数

How to resolve Apache Spark StackOverflowError after multiple unions

I have a Spark Scala program which uses a REST API to get data batch by batch, and once all the data is retrieved I operate on them.

Current Program:

  • For each batch, create RDD and merge it with the previous RDD created using the previous API call rdd.union(currentRdd).

  • Operate on final RDD

A simple program to reproduce the issue:

    def main(args: Array[String]) = {
     val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
     val sc = new SparkContext(conf)
     val limit = …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark rdd

3
推荐指数
1
解决办法
58
查看次数