我正在加入两个RDD rddA
和rddB
.
rddA
有100个分区,rddB
有500个分区.
我试图了解join
操作的机制.默认情况下,无论连接的顺序如何,我都会使用相同的分区结构; ie rddA.join(rddB
)并rddB.join(rddA)
产生相同数量的分区,并通过观察它使用较小的分区大小100.我知道我可以通过使用增加分区大小rddA.join(rddB,500)
,但我更感兴趣的是引擎盖下发生的事情以及为什么选择较小的尺寸.从观察来看,即使我重新分区小rdd
,它的分区仍然会被使用; Spark是否对密钥大小进行任何启发式分析?
我遇到的另一个问题是我得到的偏斜程度.我的较小分区最终有3,314个条目,较大的分区最终为1,139,207,总大小为599,911,729(密钥).两个RDD都使用默认分区程序,因此如何确定数据随机数?我依旧回忆起如果有人rdd
设置了分区器,那就是将要使用的分区器.是这样的吗?是"建议"这样做吗?
最后,请注意我rdd
的两个都比较大(~90GB),因此广播加入无济于事.相反,任何为操作提供一些见解join
的方法都可能是要走的路.
PS.关于机械师左右加入的任何细节都将是一个额外的奖励:)
虽然我还没有设法解释如何派生分区,但我确实找到了数据如何被洗牌(这是我最初的问题).联接有一些副作用:
改组/分区: Spark将散列分区'RDD'键并在'Workers'之间移动/分配.给定密钥的每组值(例如5)将最终出现在单个"Worker"/ JVM中.这意味着如果你的'join'具有1..N关系并且N严重偏斜,你将最终得到偏斜的分区和JVM堆(即一个'Partition'可能有Max(N)而另一个Min(N) ).避免这种情况的唯一方法是尽可能使用"广播"或忍受此行为.由于数据最初将均匀分布,因此混洗量将取决于密钥散列.
重新分区: 在"倾斜"连接之后,调用"重新分区"似乎在分区之间均匀地重新分配数据.如果您有不可避免的偏斜问题,那么这是一件好事.请注意,虽然这种转换会引发严重的混乱,但是后续操作会更快.这方面的缺点是无法控制的对象创建(见下文)
对象创建/堆污染: 您设法加入您的数据认为重新分区是重新平衡您的群集的好主意,但由于某种原因,"重新分区"会触发"OOME".发生的事情是最初连接的数据重新使用连接的对象.当您触发"重新分区"或涉及混乱的任何其他"动作"时,例如额外的连接或"groupBy"(后跟"动作"),数据会被序列化,因此您将丢失对象重用.一旦对象被反序列化,它们现在就是新实例.还要注意,在序列化期间,重复使用会丢失,因此后缀将非常繁重.因此,在我的情况下,一个1..1000000连接(其中1是我的'重'对象),将在触发shuffle的任何操作后失败.
解决方法/调试:
归档时间: |
|
查看次数: |
1118 次 |
最近记录: |