我在一个相当小的数据集上做了一个简单的组(HDFS中的80个文件,总共几个演出).我在一个纱线集群中的8台低内存机器上运行Spark,即:
spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1
数据集由长度为500-2000的字符串组成.
我正在尝试做一个简单的groupByKey(见下文),但它失败并出现java.lang.OutOfMemoryError: GC overhead limit exceeded异常
val keyvals = sc.newAPIHadoopFile("hdfs://...")
.map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()
Run Code Online (Sandbox Code Playgroud)
我可以reduceByKey毫无问题地计算组大小,确保自己问题不是由一个过大的组引起的,也不是由过多的组引起的:
keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
// (key1,139368)
// (key2,35335)
// (key3,392744)
// ...
// (key13,197941)
Run Code Online (Sandbox Code Playgroud)
我尝试过重新格式化,重组和增加groupBy并行度:
keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails
Run Code Online (Sandbox Code Playgroud)
我试着玩弄spark.default.parallelism,并增加spark.shuffle.memoryFraction至0.8同时降低spark.storage.memoryFraction至0.1
失败阶段(计数)将在3000的任务2999上失败.
我似乎无法找到任何暗示groupBy不应该只溢出到磁盘而不是将内容保存在内存中的东西,但我无法让它正常工作,即使在相当小的数据集上也是如此.这应该不是这样,我一定做错了,但我不知道从哪里开始调试!
Patrick Wendell 在邮件列表上详细介绍了groupBy运算符的细节.外卖信息如下:
在一个分区内,东西会溢出[...]此溢出只能在当前的密钥上发生.目前在密钥内不会发生溢出.[...]在GroupBy的一个密钥内溢出可能最终会在Spark的下一个版本中发布,Spark 1.2.[...]如果目标实际上只是将与每个组关联的所有值写入磁盘,并且与单个组关联的值大于适合内存,则现在无法使用groupBy运算符完成此操作.
他进一步建议一个解决方法:
解决此问题的最佳方法取决于您尝试对下游数据执行的操作.通常,方法涉及对任何非常大的组进行细分,例如,将小范围(1-10)中的散列值附加到大键.然后,您的下游代码必须处理每个组的聚合部分值.如果您的目标只是在一个大文件上按顺序在磁盘上放置每个组,您也可以
sortByKey使用哈希后缀进行调用.排序函数在Spark 1.1中进行外部化(处于预发布状态).
| 归档时间: |
|
| 查看次数: |
2978 次 |
| 最近记录: |