我正在尝试实现一个在Spark之前工作正常的Hadoop Map/Reduce作业.Spark应用程序定义如下:
val data = spark.textFile(file, 2).cache()
val result = data
.map(//some pre-processing)
.map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
.flatMap(line => MyFunctions.combine(line))
.reduceByKey( _ + _)
Run Code Online (Sandbox Code Playgroud)
哪里MyFunctions.combine是
def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
for (i <- 0 to tuples.length - 2;
j <- 1 to tuples.length - 1
) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)
Run Code Online (Sandbox Code Playgroud)
combine如果用于输入的列表很大,则该函数会生成大量映射键,这就是抛出异常的位置.
在Hadoop Map Reduce设置中,我没有遇到任何问题,因为这是combine函数产生的点,Hadoop将映射对写入磁盘.Spark似乎会将所有内容保留在内存中,直到它爆炸为止java.lang.OutOfMemoryError: GC overhead limit exceeded.
我可能正在做一些非常基本的错误,但我找不到任何关于如何从这个方面挺身而出的指示,我想知道如何避免这种情况.由于我是Scala和Spark的总菜鸟,我不确定问题是来自一个还是来自另一个,或两者兼而有之.我目前正在尝试在我自己的笔记本电脑上运行这个程序,它适用于tuples数组长度不长的输入.提前致谢.
首先让我指出我对Spark和Scala都很陌生.我试图通过尝试迁移我过去做过的Hadoop Map/Reduce Jobs之一来试图调查承诺的Spark性能.这项工作需要花费14分钟在Hadoop上使用3x r3.2xlarge机器输入16个压缩的bzip文件,每个文件170mb.我把它翻译成Scala/Spark,我可以把它变成这样的东西:
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object …Run Code Online (Sandbox Code Playgroud) 我正在运行一个管道来处理我在 Spark 上的数据。当我的 Executors 接近存储内存限制时,他们似乎时不时地死去。工作继续并最终完成,但这是正常行为吗?我应该做些什么来防止这种情况发生?每次发生这种情况时,作业都会挂起一段时间,直到(我在这里猜测)YARN 为作业提供了一些新的执行程序以继续。