锄头做Spark计划加入?

Ioa*_*nis 4 java apache-spark

我正在加入两个RDD rddArddB.

rddA有100个分区,rddB有500个分区.

我试图了解join操作的机制.默认情况下,无论连接的顺序如何,我都会使用相同的分区结构; ie rddA.join(rddB)并rddB.join(rddA)产生相同数量的分区,并通过观察它使用较小的分区大小100.我知道我可以通过使用增加分区大小rddA.join(rddB,500),但我更感兴趣的是引擎盖下发生的事情以及为什么选择较小的尺寸.从观察来看,即使我重新分区小rdd,它的分区仍然会被使用; Spark是否对密钥大小进行任何启发式分析?

我遇到的另一个问题是我得到的偏斜程度.我的较小分区最终有3,314个条目,较大的分区最终为1,139,207,总大小为599,911,729(密钥).两个RDD都使用默认分区程序,因此如何确定数据随机数?我依旧回忆起如果有人rdd设置了分区器,那就是将要使用的分区器.是这样的吗?是"建议"这样做吗?

最后,请注意我rdd的两个都比较大(~90GB),因此广播加入无济于事.相反,任何为操作提供一些见解join的方法都可能是要走的路.

PS.关于机械师左右加入的任何细节都将是一个额外的奖励:)

Ioa*_*nis 7

虽然我还没有设法解释如何派生分区,但我确实找到了数据如何被洗牌(这是我最初的问题).联接有一些副作用:

改组/分区: Spark将散列分区'RDD'键并在'Workers'之间移动/分配.给定密钥的每组值(例如5)将最终出现在单个"Worker"/ JVM中.这意味着如果你的'join'具有1..N关系并且N严重偏斜,你将最终得到偏斜的分区和JVM堆(即一个'Partition'可能有Max(N)而另一个Min(N) ).避免这种情况的唯一方法是尽可能使用"广播"或忍受此行为.由于数据最初将均匀分布,因此混洗量将取决于密钥散列.

重新分区: 在"倾斜"连接之后,调用"重新分区"似乎在分区之间均匀地重新分配数据.如果您有不可避免的偏斜问题,那么这是一件好事.请注意,虽然这种转换会引发严重的混乱,但是后续操作会更快.这方面的缺点是无法控制的对象创建(见下文)

对象创建/堆污染: 您设法加入您的数据认为重新分区是重新平衡您的群集的好主意,但由于某种原因,"重新分区"会触发"OOME".发生的事情是最初连接的数据重新使用连接的对象.当您触发"重新分区"或涉及混乱的任何其他"动作"时,例如额外的连接或"groupBy"(后跟"动作"),数据会被序列化,因此您将丢失对象重用.一旦对象被反序列化,它们现在就是新实例.还要注意,在序列化期间,重复使用会丢失,因此后缀将非常繁重.因此,在我的情况下,一个1..1000000连接(其中1是我的'重'对象),将在触发shuffle的任何操作后失败.

解决方法/调试:

  1. 我使用'mapPartitionsWithIndex'来调试分区大小,方法是返回单个项目'Iterable>'和每个分区的计数.这非常有用,因为您可以在"操作"后看到"重新分区"的效果和分区的状态.
  2. 您可以在连接RDD上使用'countByKeyApprox'或'countByKey'来感受基数,然后分两步应用连接.使用"广播"为您提供高基数键,为低基数键使用"加入".将这些操作包装在'rdd.cache()'和'rdd.unpersist()'块中将显着加快此过程.虽然这可能会使您的代码稍微复杂化,但它会提供更好的性能,尤其是在您执行后续操作时.另请注意,如果您在每个"地图"中使用"广播"进行查找,您还将显着减少随机播放的大小.
  3. 调用影响分区数量的其他操作的"重新分区"非常有用,但要注意(随机)大量分区会导致更多的错误,因为给定密钥的大集将创建大分区,但另一个分区将具有较小的大小或0.创建调试方法以获得分区大小将帮助您选择一个好的大小.