anw*_*ian 4 architecture internal apache-spark
我有一个关于火花的非常基本的问题。我通常使用 50 个内核运行 spark 作业。在查看作业进度时,大多数情况下它会显示 50 个并行运行的进程(正如它应该做的那样),但有时它只显示 2 或 4 个并行运行的 spark 进程。像这样:
[Stage 8:================================> (297 + 2) / 500]
Run Code Online (Sandbox Code Playgroud)
正在处理的 RDDrepartitioned位于 100 多个分区上。所以这应该不是问题。
不过我有一个观察。我见过这样的模式,大多数情况下,SparkUI 中的数据局部性显示NODE_LOCAL,而其他时候当所有 50 个进程都在运行时,一些进程显示RACK_LOCAL. 这让我怀疑,这可能是因为在同一节点中处理数据之前缓存了数据以避免网络开销,这会减慢进一步处理的速度。
如果是这种情况,有什么方法可以避免。如果不是这种情况,这里发生了什么?
经过一周或更长时间的努力,我想我已经找到了导致问题的原因。
如果您遇到同样的问题,最好先检查 Spark 实例是否配置良好。有一篇关于它的很棒的cloudera 博客文章。
但是,如果问题不在于配置(就像我的情况一样),那么问题就出在您的代码中。问题是,有时由于不同的原因(倾斜连接、数据源中的分区不均匀等),您正在处理的 RDD 在 2-3 个分区上获取大量数据,而其余分区的数据很少。
为了减少网络上的数据混洗,Spark 尝试让每个执行器处理驻留在该节点本地的数据。所以,2-3个执行器长时间工作,其余的执行器在几毫秒内完成数据。这就是为什么我遇到了我在上面的问题中描述的问题。
调试这个问题的方法是首先检查你的RDD的分区大小。如果一个或几个分区与其他分区相比非常大,那么下一步将是在大分区中查找记录,以便您可以知道,尤其是在倾斜连接的情况下,哪个键正在发生倾斜。我写了一个小函数来调试这个:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
Run Code Online (Sandbox Code Playgroud)
它给了我最小和最大的分区大小,如果这两者之间的差异超过 5 倍,它会打印最大分区的 5 个元素,让您大致了解发生了什么。
一旦你发现问题是偏斜分区,你可以找到一种方法来摆脱那个偏斜的键,或者你可以重新分区你的数据帧,这将迫使它得到均匀分布,你现在会看到所有执行器将在相同的时间内工作,您会看到可怕的 OOM 错误要少得多,并且处理速度也会非常快。
这些只是我作为 Spark 新手的两分钱,我希望 Spark 专家可以为这个问题添加更多内容,因为我认为 Spark 世界中的很多新手经常面临类似的问题。
| 归档时间: |
|
| 查看次数: |
1404 次 |
| 最近记录: |