在闭包之外调用函数时会出现奇怪的行为:
任务不可序列化:java.io.NotSerializableException:testing
问题是我需要在类中的代码而不是对象.知道为什么会这样吗?Scala对象是否已序列化(默认?)?
这是一个有效的代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Run Code Online (Sandbox Code Playgroud)
这是一个非工作的例子:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) …Run Code Online (Sandbox Code Playgroud) 嗨,我实际上使用Spark SQL hiveContext.sql(),它使用查询组,我遇到了OOM问题.因此,考虑将spark.sql.shuffle.partitions200的默认值增加到1000,但它没有帮助.请纠正我,如果我错了,这个分区将共享数据shuffle load,所以分区更少数据保持.请指导我是Spark新手.我正在使用Spark 1.4.0,我有大约1TB的未压缩数据,可以使用hiveContext.sql()group by queries 进行处理.
我最近在我的服务器上使用PySpark与Ipython一起使用24个CPU和32GB RAM.它只能在一台机器上运行.在我的过程中,我想收集大量数据,如下面的代码所示:
train_dataRDD = (train.map(lambda x:getTagsAndText(x))
.filter(lambda x:x[-1]!=[])
.flatMap(lambda (x,text,tags): [(tag,(x,text)) for tag in tags])
.groupByKey()
.mapValues(list))
Run Code Online (Sandbox Code Playgroud)
当我做
training_data = train_dataRDD.collectAsMap()
Run Code Online (Sandbox Code Playgroud)
它给了我outOfMemory错误.Java heap Space.此外,我在此错误后无法对Spark执行任何操作,因为它失去了与Java的连接.它给出了Py4JNetworkError: Cannot connect to the java server.
看起来堆空间很小.如何将其设置为更大的限制?
编辑:
我在跑步之前尝试过的事情:
sc._conf.set('spark.executor.memory','32g').set('spark.driver.memory','32g').set('spark.driver.maxResultsSize','0')
我按照此处的文档更改了spark选项(如果你执行ctrl-f并搜索spark.executor.extraJavaOptions):http://spark.apache.org/docs/1.2.1/configuration.html
它说我可以通过设置spark.executor.memory选项来避免OOM.我做了同样的事情,但似乎没有工作.
我使用以下命令在本地模式下使用Spark 2.0调用Pyspark:
pyspark --executor-memory 4g --driver-memory 4g
Run Code Online (Sandbox Code Playgroud)
输入数据帧正在从tsv文件中读取,并具有580 K x 28列.我正在对数据帧进行一些操作,然后我尝试将其导出到tsv文件,我收到此错误.
df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')
Run Code Online (Sandbox Code Playgroud)
任何指针如何摆脱这个错误.我可以轻松显示df或计算行数.
输出数据帧为3100行,共23列
错误:
Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes …Run Code Online (Sandbox Code Playgroud) 我已经构建了Spark和Flink k-means应用程序.我的测试用例是在3节点集群上对100万个点进行聚类.
当内存中的瓶颈开始时,Flink开始外包到磁盘并且工作缓慢但工作正常.但是,如果内存已满并且再次启动(无限循环?),Spark会丢失执行程序.
我试着在邮件列表的帮助下自定义内存设置,谢谢.但Spark仍然无效.
是否有必要设置任何配置?我的意思是Flink工作的内存很低,Spark也必须能够; 或不?
我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.
我有一个简单的工作流程:
filter 下到一小部分行 select 一小部分列collect进入驱动程序节点(所以我可以做其他操作R)当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.
我试过运行以下设置:
对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者
java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.(sparklyr表示存储器问题的错误).
根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.
请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.
我的问题:
谢谢
编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …
我试图使用本指南在EC2上使用Spark主机对常见爬网数据进行简单转换,我的代码如下所示:
package ccminer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ccminer {
val english = "english|en|eng"
val spanish = "es|esp|spa|spanish|espanol"
val turkish = "turkish|tr|tur|turc"
val greek = "greek|el|ell"
val italian = "italian|it|ita|italien"
val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")
def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")
def main(args: Array[String]): Unit = {
if (args.length != 3) {
System.err.println("Bad command line")
System.exit(-1)
}
val cluster = "spark://???"
val sc = new SparkContext(cluster, "Common Crawl Miner", …Run Code Online (Sandbox Code Playgroud) 我正在处理一些奇怪的错误消息,我认为这些消息归结为内存问题,但我很难将其固定下来并可以使用专家的一些指导.
我有一台2机Spark(1.0.1)集群.两台机器都有8个核心; 一个有16GB内存,另一个有32GB(这是主机).我的应用涉及计算图像中的成对像素亲和力,尽管到目前为止我测试过的图像只有1920x1200,小到16x16.
我确实需要更改一些内存和并行设置,否则我会得到明确的OutOfMemoryExceptions.在spark-default.conf中:
spark.executor.memory 14g
spark.default.parallelism 32
spark.akka.frameSize 1000
Run Code Online (Sandbox Code Playgroud)
在spark-env.sh中:
SPARK_DRIVER_MEMORY=10G
Run Code Online (Sandbox Code Playgroud)
但是,通过这些设置,除了丢失的执行程序之外,我还得到了一堆关于"丢失的TID"(没有任务成功完成)的WARN语句,这些语句重复4次,直到我最终得到以下错误消息并崩溃:
14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at /home/user/Programming/PySpark-Affinities/affinity.py:243
Traceback (most recent call last):
File "/home/user/Programming/PySpark-Affinities/affinity.py", line 243, in <module>
lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 583, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
: org.apache.spark.SparkException: Job aborted …Run Code Online (Sandbox Code Playgroud) 我有一个包含150 G txt文件的文件夹(大约700个文件,平均每个200 MB).
我正在使用scala处理文件并最终计算一些聚合统计信息.我看到两种可行的方法:
我倾向于第二种方法,因为它看起来更干净(不需要特定于并行化的代码),但我想知道我的方案是否适合我的硬件和数据所施加的限制.我有一个工作站,有16个线程和64 GB RAM可用(因此并行化将严格地在不同处理器核心之间本地化).我可能会在以后使用更多计算机扩展基础架构,但是现在我只想专注于调整这一个工作站场景的设置.
我正在使用的代码: - 读取TSV文件,并将有意义的数据提取到(String,String,String)三元组 - 然后执行一些过滤,映射和分组 - 最后,减少数据并计算一些聚合
我已经能够用一个单一的文件(〜200 MB的数据)来运行该代码,但是我收到java.lang.OutOfMemoryError:GC开销超过限制和/或Java进行添加更多的数据时,堆异常(在应用程序中断了6GB的数据,但我想将它与150 GB的数据一起使用).
我想我必须调整一些参数才能使其工作.我将不胜感激任何有关如何解决此问题的提示(如何调试内存需求).我已经尝试增加'spark.executor.memory'并使用较少数量的内核(理性的是每个内核需要一些堆空间),但这并没有解决我的问题.
我不需要解决方案非常快(如果需要,它可以轻松运行几个小时甚至几天).我也没有缓存任何数据,但最后只是将它们保存到文件系统中.如果您认为使用手动并行化方法更可行,我也可以这样做.
我在一个相当小的数据集上做了一个简单的组(HDFS中的80个文件,总共几个演出).我在一个纱线集群中的8台低内存机器上运行Spark,即:
spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1
数据集由长度为500-2000的字符串组成.
我正在尝试做一个简单的groupByKey(见下文),但它失败并出现java.lang.OutOfMemoryError: GC overhead limit exceeded异常
val keyvals = sc.newAPIHadoopFile("hdfs://...")
.map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()
Run Code Online (Sandbox Code Playgroud)
我可以reduceByKey毫无问题地计算组大小,确保自己问题不是由一个过大的组引起的,也不是由过多的组引起的:
keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
// (key1,139368)
// (key2,35335)
// (key3,392744)
// ...
// (key13,197941)
Run Code Online (Sandbox Code Playgroud)
我尝试过重新格式化,重组和增加groupBy并行度:
keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails
Run Code Online (Sandbox Code Playgroud)
我试着玩弄spark.default.parallelism,并增加spark.shuffle.memoryFraction至0.8同时降低 …
apache-spark ×10
memory ×3
scala ×3
pyspark ×2
python ×2
amazon-ec2 ×1
apache-flink ×1
hadoop ×1
heap-memory ×1
java ×1
sparklyr ×1
typesafe ×1