相关疑难解决方法(0)

任务不可序列化:java.io.NotSerializableException仅在类而不是对象上调用闭包外的函数时

在闭包之外调用函数时会出现奇怪的行为:

  • 当函数在一个对象中时,一切正常
  • 当函数在类中时获取:

任务不可序列化:java.io.NotSerializableException:testing

问题是我需要在类中的代码而不是对象.知道为什么会这样吗?Scala对象是否已序列化(默认?)?

这是一个有效的代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}
Run Code Online (Sandbox Code Playgroud)

这是一个非工作的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) …
Run Code Online (Sandbox Code Playgroud)

serialization scala typesafe apache-spark

211
推荐指数
6
解决办法
15万
查看次数

spark.sql.shuffle.partitions的最佳值应该是什么,或者在使用Spark SQL时如何增加分区?

嗨,我实际上使用Spark SQL hiveContext.sql(),它使用查询组,我遇到了OOM问题.因此,考虑将spark.sql.shuffle.partitions200的默认值增加到1000,但它没有帮助.请纠正我,如果我错了,这个分区将共享数据shuffle load,所以分区更少数据保持.请指导我是Spark新手.我正在使用Spark 1.4.0,我有大约1TB的未压缩数据,可以使用hiveContext.sql()group by queries 进行处理.

apache-spark apache-spark-sql

36
推荐指数
2
解决办法
3万
查看次数

PySpark:java.lang.OutofMemoryError:Java堆空间

我最近在我的服务器上使用PySpark与Ipython一起使用24个CPU和32GB RAM.它只能在一台机器上运行.在我的过程中,我想收集大量数据,如下面的代码所示:

train_dataRDD = (train.map(lambda x:getTagsAndText(x))
.filter(lambda x:x[-1]!=[])
.flatMap(lambda (x,text,tags): [(tag,(x,text)) for tag in tags])
.groupByKey()
.mapValues(list))
Run Code Online (Sandbox Code Playgroud)

当我做

training_data =  train_dataRDD.collectAsMap()
Run Code Online (Sandbox Code Playgroud)

它给了我outOfMemory错误.Java heap Space.此外,我在此错误后无法对Spark执行任何操作,因为它失去了与Java的连接.它给出了Py4JNetworkError: Cannot connect to the java server.

看起来堆空间很小.如何将其设置为更大的限制?

编辑:

我在跑步之前尝试过的事情: sc._conf.set('spark.executor.memory','32g').set('spark.driver.memory','32g').set('spark.driver.maxResultsSize','0')

我按照此处的文档更改了spark选项(如果你执行ctrl-f并搜索spark.executor.extraJavaOptions):http://spark.apache.org/docs/1.2.1/configuration.html

它说我可以通过设置spark.executor.memory选项来避免OOM.我做了同样的事情,但似乎没有工作.

java heap-memory out-of-memory apache-spark pyspark

34
推荐指数
2
解决办法
4万
查看次数

java.lang.OutOfMemoryError:无法获取100个字节的内存,得到0

我使用以下命令在本地模式下使用Spark 2.0调用Pyspark:

pyspark --executor-memory 4g --driver-memory 4g
Run Code Online (Sandbox Code Playgroud)

输入数据帧正在从tsv文件中读取,并具有5​​80 K x 28列.我正在对数据帧进行一些操作,然后我尝试将其导出到tsv文件,我收到此错误.

df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')
Run Code Online (Sandbox Code Playgroud)

任何指针如何摆脱这个错误.我可以轻松显示df或计算行数.

输出数据帧为3100行,共23列

错误:

Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes …
Run Code Online (Sandbox Code Playgroud)

python memory hadoop apache-spark pyspark

16
推荐指数
4
解决办法
1万
查看次数

Spark vs Flink内存不足

我已经构建了Spark和Flink k-means应用程序.我的测试用例是在3节点集群上对100万个点进行聚类.

当内存中的瓶颈开始时,Flink开始外包到磁盘并且工作缓慢但工作正常.但是,如果内存已满并且再次启动(无限循环?),Spark会丢失执行程序.

我试着在邮件列表的帮助下自定义内存设置,谢谢.但Spark仍然无效.

是否有必要设置任何配置?我的意思是Flink工作的内存很低,Spark也必须能够; 或不?

memory apache-spark apache-flink

14
推荐指数
1
解决办法
2384
查看次数

从Spark群集中收集数据时出现内存不足错误

我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.

我有一个简单的工作流程:

  1. 从Amazon S3读入ORC文件
  2. filter 下到一小部分行
  3. select 一小部分列
  4. collect进入驱动程序节点(所以我可以做其他操作R)

当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.

我试过运行以下设置:

  • 具有32核和244GB内存的计算机上的本地模式
  • 独立模式,具有10 x 6.2 GB执行程序和61 GB驱动程序节点

对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者 java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者 Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.(sparklyr表示存储器问题的错误).

根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.

请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.

我的问题:

  1. 如何在缓存后占用如此少空间的数据帧会导致内存问题?
  2. 在我继续讨论可能影响性能的其他选项之前,有什么显而易见的东西可以检查/更改/排除故障以帮助解决问题吗?

谢谢

编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …

memory apache-spark sparklyr

13
推荐指数
1
解决办法
8466
查看次数

按键分组时Spark会耗尽内存

我试图使用本指南在EC2上使用Spark主机对常见爬网数据进行简单转换,我的代码如下所示:

package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner", …
Run Code Online (Sandbox Code Playgroud)

scala amazon-ec2 apache-spark

12
推荐指数
2
解决办法
1万
查看次数

Apache Spark:由于阶段失败导致作业中止:"TID x因未知原因失败"

我正在处理一些奇怪的错误消息,我认为这些消息归结为内存问题,但我很难将其固定下来并可以使用专家的一些指导.

我有一台2机Spark(1.0.1)集群.两台机器都有8个核心; 一个有16GB内存,另一个有32GB(这是主机).我的应用涉及计算图像中的成对像素亲和力,尽管到目前为止我测试过的图像只有1920x1200,小到16x16.

我确实需要更改一些内存和并行设置,否则我会得到明确的OutOfMemoryExceptions.在spark-default.conf中:

spark.executor.memory    14g
spark.default.parallelism    32
spark.akka.frameSize        1000
Run Code Online (Sandbox Code Playgroud)

在spark-env.sh中:

SPARK_DRIVER_MEMORY=10G
Run Code Online (Sandbox Code Playgroud)

但是,通过这些设置,除了丢失的执行程序之外,我还得到了一堆关于"丢失的TID"(没有任务成功完成)的WARN语句,这些语句重复4次,直到我最终得到以下错误消息并崩溃:

14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at /home/user/Programming/PySpark-Affinities/affinity.py:243
Traceback (most recent call last):
  File "/home/user/Programming/PySpark-Affinities/affinity.py", line 243, in <module>
    lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
  File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 583, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
: org.apache.spark.SparkException: Job aborted …
Run Code Online (Sandbox Code Playgroud)

python apache-spark

11
推荐指数
1
解决办法
1万
查看次数

火花内存不足

我有一个包含150 G txt文件的文件夹(大约700个文件,平均每个200 MB).

我正在使用scala处理文件并最终计算一些聚合统计信息.我看到两种可行的方法:

  • 手动循环遍历所有文件,对每个文件进行计算并最终合并结果
  • 将整个文件夹读取到一个RDD,对此单个RDD执行所有操作,并让spark执行所有并行化

我倾向于第二种方法,因为它看起来更干净(不需要特定于并行化的代码),但我想知道我的方案是否适合我的硬件和数据所施加的限制.我有一个工作站,有16个线程和64 GB RAM可用(因此并行化将严格地在不同处理器核心之间本地化).我可能会在以后使用更多计算机扩展基础架构,但是现在我只想专注于调整这一个工作站场景的设置.

我正在使用的代码: - 读取TSV文件,并将有意义的数据提取到(String,String,String)三元组 - 然后执行一些过滤,映射和分组 - 最后,减少数据并计算一些聚合

我已经能够用一个单一的文件(〜200 MB的数据)来运行该代码,但是我收到java.lang.OutOfMemoryError:GC开销超过限制和/或Java进行添加更多的数据时,堆异常(在应用程序中断了6GB的数据,但我想将它与150 GB的数据一起使用).

我想我必须调整一些参数才能使其工作.我将不胜感激任何有关如何解决此问题的提示(如何调试内存需求).我已经尝试增加'spark.executor.memory'并使用较少数量的内核(理性的是每个内核需要一些堆空间),但这并没有解决我的问题.

我不需要解决方案非常快(如果需要,它可以轻松运行几个小时甚至几天).我也没有缓存任何数据,但最后只是将它们保存到文件系统中.如果您认为使用手动并行化方法更可行,我也可以这样做.

scala apache-spark

10
推荐指数
1
解决办法
6785
查看次数

Spark groupBy OutOfMemory让人痛苦

我在一个相当小的数据集上做了一个简单的组(HDFS中的80个文件,总共几个演出).我在一个纱线集群中的8台低内存机器上运行Spark,即:

spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1

数据集由长度为500-2000的字符串组成.

我正在尝试做一个简单的groupByKey(见下文),但它失败并出现java.lang.OutOfMemoryError: GC overhead limit exceeded异常

val keyvals = sc.newAPIHadoopFile("hdfs://...")
  .map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()
Run Code Online (Sandbox Code Playgroud)

我可以reduceByKey毫无问题地计算组大小,确保自己问题不是由一个过大的组引起的,也不是由过多的组引起的:

keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
//  (key1,139368)
//  (key2,35335)
//  (key3,392744)
//  ...
//  (key13,197941)
Run Code Online (Sandbox Code Playgroud)

我尝试过重新格式化,重组和增加groupBy并行度:

keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails
Run Code Online (Sandbox Code Playgroud)

我试着玩弄spark.default.parallelism,并增加spark.shuffle.memoryFraction0.8同时降低 …

apache-spark

7
推荐指数
1
解决办法
2978
查看次数