作为spark作业提交时,Spark RDD映射中的NullPointerException

csc*_*can 4 hadoop scala distributed-computing bigdata apache-spark

我们正在尝试提交一个火花工作(火花2.0,hadoop 2.7.2),但由于某种原因,我们在EMR中收到了相当神秘的NPE.一切都像scala程序一样运行,所以我们不确定是什么导致了这个问题.这是堆栈跟踪:

18:02:55,271 ERROR Utils:91 - 在org.apache.spark.sql.catalyst的org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $(未知来源)中止任务java.lang.NullPointerException .expressions.GeneratedClass $ GeneratedIterator.processNext(未知来源)org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $ $ anon $ 1.hasNext(WholeStageCodegenExec.scala:370)at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:438)at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp(WriterContainer.scala:253)位于org.apache.spark的org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252). sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply(WriterContainer.scala:252)at org.apache.spark.util.Utils $ .tryWithSafeFinallyA ndFailureCallbacks(Utils.scala:1325)org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $应用$ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143)在org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1 .apply(InsertIntoHadoopFsRelationCommand.scala:143)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)atg.apache.spark.scheduler.Task.run(Task.scala:85)at org. java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)中的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.)中的apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274). java:617)at java.lang.Thread.run(Thread.java:745)

据我们所知,这种情况发生在以下方法中:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.map(row =>
      "text|label"
  ).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
Run Code Online (Sandbox Code Playgroud)

我们将它缩小到地图功能,因为这在作为spark工作提交时有效:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
Run Code Online (Sandbox Code Playgroud)

有谁知道可能导致这个问题的原因是什么?另外,我们如何解决它?我们很难过.

gsa*_*ras 6

我认为NullPointerException当工人试图访问SparkContext仅存在于驱动程序而不是工作者上的对象时,工作人员会抛出该工具.

coalesce()重新分配您的数据.当您仅请求一个分区时,它将尝试挤压一个分区中的所有数据*.这可能会给应用程序的内存占用带来很大压力.

通常,最好不要仅在1中缩小分区.

有关更多信息,请阅读:使用saveAsTextFile它的Spark NullPointerException.


  • 当我们只用五条记录进行测试时发生了这个错误 - 我不认为它与内存使用有关. (2认同)