16个任务(1048.5 MB)的序列化结果总大小大于spark.driver.maxResultSize(1024.0 MB)

Mar*_*kus 10 python apache-spark pyspark spark-dataframe

当我添加--conf spark.driver.maxResultSize=2050到我的spark-submit命令时,我收到以下错误.

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
Run Code Online (Sandbox Code Playgroud)

添加此配置的原因是错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)

因此,我增加到maxResultSize2.5 Gb,但无论如何Spark工作都失败了(上面显示的错误).如何解决这个问题?

Rya*_*ier 18

看起来问题是您尝试撤回到驱动程序的数据量太大.您很可能使用collect方法从DataFrame/RDD中检索所有值.该驱动程序是一个过程,通过收集数据框你拉你已经在集群回一个节点的分布式数据的. 这违背了分发它的目的! 只有在将数据减少到可管理的数量后才能执行此操作.

您有两种选择:

1)如果你真的需要处理所有这些数据,那么你应该把它留在执行者身上.使用HDFSParquet以分布式方式保存数据,并使用Spark方法处理群集上的数据,而不是尝试将其全部收回到一个位置.

2)如果您确实需要将数据恢复到驱动程序,则应检查是否确实需要所有数据.如果您只需要汇总统计信息,那么在调用collect之前在执行程序上计算出来.或者,如果您只需要前100名结果,那么只收集前100名.


vj *_*san 14

原因:由 RDD 的 collect() 之类的操作引起,这些操作将大量数据发送给驱动程序

解决方案:由 SparkConf 设置:conf.set("spark.driver.maxResultSize", "4g") 或由 spark-defaults.conf 设置:spark.driver.maxResultSize 4g 或在调用 spark-submit 时设置:--conf spark.driver.maxResultSize=4g