anw*_*ian 6 python apache-spark apache-spark-sql pyspark
我有两个数据框,df1分别有 2200 万条记录和df2200 万条记录。我正在做正确的加入email_address作为关键。
test_join = df2.join(df1, "email_address", how = 'right').cache()
Run Code Online (Sandbox Code Playgroud)
两个数据框中都很少有重复的(如果有的话)电子邮件。连接后,我尝试test_join使用以下代码找到结果数据帧的分区大小:
l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))
Run Code Online (Sandbox Code Playgroud)
结果表明,最大分区比平均分区大小大约大 100 倍。分区大小的这种偏差会在连接后转换和操作中带来性能问题。
我知道我可以在使用命令连接后对其重新分区repartion(num_partitions),但我的问题是为什么我会遇到这种不均匀的分区结果,有什么方法可以首先避免它。
PS:只是为了检查问题是否仅与 email_address 哈希函数有关的假设,我还检查了其他几个连接的分区大小,我还在数字键连接中看到了问题。
@user6910411 你说得对。问题出在我的数据上,输入空电子邮件时遵循了一些愚蠢的约定,这导致了这个倾斜密钥问题。
当我检查了最大隔间的入口后,我才知道里面发生了什么。我发现这种调试技术非常有用,并且我确信这可以帮助面临同样问题的其他人。
顺便说一句,这是我编写的函数,用于查找 RDD 分区的偏度:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
Run Code Online (Sandbox Code Playgroud)
然后我只是传递我想要检查偏度的数据框,如下所示:
check_skewness(test_join)
Run Code Online (Sandbox Code Playgroud)
它给了我关于其偏度的很好的信息。
| 归档时间: |
|
| 查看次数: |
1656 次 |
| 最近记录: |