Piy*_*ush 2 python apache-spark pyspark
我用 python 编写了一个运行正常的 Spark 程序。
但是,它在内存消耗方面效率低下,我正在尝试对其进行优化。我在 AWS EMR 上运行它,而 EMR 正在杀死消耗太多内存的工作。
Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Run Code Online (Sandbox Code Playgroud)
我相信这个内存问题是由于我在多个实例中收集我的 RDD(即使用 .collect() ),因为在后期阶段,我需要测试由这些 RDD 组成的列表中是否存在某些值.
所以,目前我的代码是这样的:
myrdd = data.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
Run Code Online (Sandbox Code Playgroud)
稍后在代码中
if word in myrdd:
mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
if word in myrdd2:
mylist2.append(word)
Run Code Online (Sandbox Code Playgroud)
然后我多次重复这种模式。
有没有办法做手术
if word in myrdd:
do something
Run Code Online (Sandbox Code Playgroud)
不先收集rdd?
有没有像 rdd.contains() 这样的函数?
PS:我没有在内存中缓存任何东西。我的 spark 上下文如下所示:
jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()
Run Code Online (Sandbox Code Playgroud)
来自 YARN 的错误消息说这collect不是问题,因为您的执行程序(而不是驱动程序)存在内存问题。
首先,尝试遵循错误消息建议并提升spark.yarn.executor.memoryOverhead- 在 YARN 上运行 pyspark 时,您可以告诉 YARN 为 Python 工作进程内存分配更大的容器。
接下来,查看执行器需要大量内存的操作。您使用reduceByKey,也许您可以增加分区数量,使它们在使用的内存方面更小。查看numPartitions参数:http : //spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey
最后,如果您想检查 rdd 是否包含某个值,则只需按此值过滤并使用countor检查它first,例如:
looking_for = "....."
contains = rdd.filter(lambda a: a == looking_for).count() > 0
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8010 次 |
| 最近记录: |