java.lang.OutOfMemoryError:无法获取100个字节的内存,得到0

ML_*_*ion 16 python memory hadoop apache-spark pyspark

我使用以下命令在本地模式下使用Spark 2.0调用Pyspark:

pyspark --executor-memory 4g --driver-memory 4g
Run Code Online (Sandbox Code Playgroud)

输入数据帧正在从tsv文件中读取,并具有5​​80 K x 28列.我正在对数据帧进行一些操作,然后我尝试将其导出到tsv文件,我收到此错误.

df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')
Run Code Online (Sandbox Code Playgroud)

任何指针如何摆脱这个错误.我可以轻松显示df或计算行数.

输出数据帧为3100行,共23列

错误:

Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes of memory, got 0
    at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:129)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.fetchNextRow(WindowExec.scala:300)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.<init>(WindowExec.scala:309)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:289)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:288)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
    ... 8 more

Driver stacktrace:
Run Code Online (Sandbox Code Playgroud)

gsa*_*ras 15

我相信这个问题的原因是coalesce(),尽管事实上它避免了完全的混乱(比如重新分区会这样做),但它必须缩小所请求数量的分区中的数据.

在这里,您要求所有数据适合一个分区,因此一个任务(并且只有一个任务)必须处理所有数据,这可能导致其容器受到内存限制.

因此,要么提出比1更多的分区,要么coalesce()在这种情况下避免.


否则,您可以尝试以下链接中提供的解决方案,以增加内存配置:

  1. Spark java.lang.OutOfMemoryError:Java堆空间
  2. 按键分组时Spark会耗尽内存


Mar*_*cok 13

正如其他答案中所述,使用repartition(1)代替coalesce(1). 原因是 repartition(1) 将确保上游处理并行完成(多个任务/分区),而不是仅在一个执行程序上完成。

引用Dataset.coalesce() Spark 文档:

但是,如果您进行了剧烈的合并,例如将 numPartitions = 1,这可能会导致您在比您喜欢的节点数更少的节点上进行计算(例如,在 numPartitions = 1 的情况下只有一个节点)。为避免这种情况,您可以改为调用 repartition(1)。这将添加一个 shuffle 步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。


bob*_*o32 11

我的问题确实存在coalesce().我所做的是导出文件而不使用coalesce()镶木地板代替使用df.write.parquet("testP").然后回读文件并导出coalesce(1).

希望它也适合你.


Gan*_*thi 9

就我而言,coalesce(1)repartition(1)Worked 代替。

  • 请考虑在您的答案中添加更多信息 (2认同)