当我使用"++"来组合大量的RDD时,我得到了错误堆栈溢出错误.
Spark版本1.3.1环境:yarn-client.--driver-memory 8G
RDD的数量超过4000.每个RDD都从大小为1 GB的文本文件中读取.
它以这种方式生成
val collection = (for (
path <- files
) yield sc.textFile(path)).reduce(_ union _)
Run Code Online (Sandbox Code Playgroud)
files小尺寸时工作正常.而且有错误
错误重演.我猜这是一个被称为太多时间的递归函数?
Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
.....
Run Code Online (Sandbox Code Playgroud)
Sea*_*wen 21
SparkContext.union(...)相反,请使用一次联合多个RDD.
你不想一次一个地做这个,因为RDD.union()为每个RDD在谱系中创建一个新步骤(任何计算的额外堆栈帧集合),而SparkContext.union()使它成为一个一次全部.这样可以确保不会出现堆栈溢出错误.