火花可扩展性:我做错了什么?

sds*_*sds 6 scalability distributed-computing bigdata apache-spark pyspark

我正在使用spark处理数据,它可以处理一天的数据(40G),但是在一周的数据中使用OOM会失败:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")
Run Code Online (Sandbox Code Playgroud)

不同ID的数量小于10k.每个ID都很小int.由于太多执行程序因OOM而失败,因此作业失败.当工作成功时(在小输入上),"myoutput"大约是100k.

  1. 我究竟做错了什么?
  2. 我试着更换saveAsTextFilecollect(因为我实际上希望做一些切片和保存之前在python切割),但在行为上是没有任何区别,同样的失败.这是预期的吗?
  3. 我曾经有过reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])而不是sc.union- 哪个更好?它有什么不同吗?

该集群有25个节点,内存为825GB,其中224个内核.

调用是spark-submit --master yarn --num-executors 50 --executor-memory 5G.

单个RDD有大约140列并且覆盖一小时的数据,因此一周是168(= 7*24)个RDD的并集.

gsa*_*ras 2

Spark 在扩展时经常会出现内存不足错误。在这些情况下,程序员应该进行微调。或者重新检查您的代码,以确保您没有做任何太多的事情,例如收集驱动程序中的所有大数据这很可能超出内存开销限制,无论您将其设置有多大。

\n\n

要了解发生了什么,您应该意识到当决定杀死超出内存限制的容器时。当容器超出内存开销限制时就会发生这种情况。

\n\n

在调度程序中,您可以检查事件时间线以查看容器发生了什么。如果 Yarn 杀死了一个容器,它将显示为红色,当您将鼠标悬停/单击它时,您将看到类似以下的消息:

\n\n
\n

容器因超出内存限制而被 YARN 终止。已使用 16 GB 物理内存中的 16.9 GB。考虑提高spark.yarn.executor.memoryOverhead。

\n
\n\n

在此输入图像描述

\n\n
\n\n

因此,在这种情况下,您需要关注的是这些配置属性(值是我的集群上的示例):

\n\n
# More executor memory overhead\nspark.yarn.executor.memoryOverhead          4096\n\n# More driver memory overhead\nspark.yarn.driver.memoryOverhead            8192\n\n# Max on my nodes\n#spark.executor.cores                        8\n#spark.executor.memory                       12G\n\n# For the executors\nspark.executor.cores                        6\nspark.executor.memory                       8G\n\n# For the driver\nspark.driver.cores                          6\nspark.driver.memory                         8G\n
Run Code Online (Sandbox Code Playgroud)\n\n

首先要做的就是增加memoryOverhead.

\n\n

在驱动程序中还是在执行程序中?

\n\n

当您从 UI 概览集群时,您可以单击尝试 ID 并检查诊断信息,其中应提及被终止的容器的 ID。如果它与您的AM Container相同,那么它是驱动程序,否则是执行程序。

\n\n
\n\n

这并没有解决问题,现在怎么办?

\n\n

您必须微调所提供的核心数量和堆内存。您会看到将在堆外内存中完成大部分工作,因此您不想为堆提供太多空间,因为这会被浪费。你不想给的太少,因为那样垃圾收集器就会出现问题。回想一下,这些是 JVM。

\n\n

如此处所述,一个工作线程可以托管多个执行程序,因此使用的核心数量会影响每个执行程序拥有的内存量,因此减少 #cores 可能会有所帮助。

\n\n

我把它写在Spark和 Spark \xe2\x80\x93 Container exited with a non-zero exit code 143 的内存开销问题中,更详细,主要是我不会忘记!我没有尝试过的另一个选择是spark.default.parallelism或/and spark.storage.memoryFraction,根据我的经验,它没有帮助。

\n\n
\n\n

您可以按照 sds 提到的方式传递配置标志,或者像这样:

\n\n
spark-submit --properties-file my_properties\n
Run Code Online (Sandbox Code Playgroud)\n\n

其中“my_properties”类似于我上面列出的属性。

\n\n

对于非数值,您可以这样做:

\n\n
spark-submit --conf spark.executor.memory=\'4G\' \n
Run Code Online (Sandbox Code Playgroud)\n