我正在从s3加载数万个gzip压缩文件,用于我的火花工作.这导致一些分区非常小(10个记录)和一些非常大(10000个记录).分区的大小非常好地分布在节点之间,因此每个执行器似乎在聚合中处理相同数量的数据.所以我不确定我是否有问题.
我怎么知道是否值得重新分区或合并RDD?这些中的任何一个都可以在不拖沓数据的情况下平衡分区吗?此外,RDD不会被重用,只是映射然后加入另一个RDD.
有趣的问题.关于合并与重新分区,合并肯定会更好,因为它不会触发完全洗牌.通常,当跨分区(例如,在过滤器之后)有稀疏数据时,建议使用合并.我认为这是一个类似的情况,但直接从最初的负载.但是,我真的认为,由于你在初始加载后使用RDD做了什么,合并可能对你有所帮助.
当您将连接应用于加载的RDD时,如果数据被洗牌,Spark会咨询shuffle管理器以查看它应该使用哪个shuffle实现(通过配置spark.shuffle.manager).shuffle管理器有两种实现:( hash默认版本<1.2.0)和sort(默认> = 1.2.0).
如果使用hash实现,则每个输入分区将创建输出文件以发送到将进行连接的相应reducer.这可以创建一个巨大的文件爆炸,可以通过设置spark.shuffle.consolidateFiles为true 来缓解,但如果有很多分区作为输入,最终可能会给你一个非常慢的连接.如果使用此实现,合并绝对是值得的,因为大量的输入分区可以产生大量文件以减少.
如果使用了sort实现,则每个分区只有一个输出文件(whew!),并且文件被索引,以便reducers可以从它们各自的索引中获取它们的键.但是,对于许多输入分区,Spark仍将从所有输入分区读取以收集每个可能的密钥.如果使用此实现,则合并可能仍然值得,因为将此搜索和读取应用于每个分区也可能是昂贵的.
如果您最终使用合并,则您可能需要调整您想要合并的分区数,因为合并将是执行计划中的一个步骤.但是,此步骤可能会为您节省非常昂贵的加入费用.另外,作为附注,这篇文章非常有助于解释改组背后的实现.
| 归档时间: |
|
| 查看次数: |
4131 次 |
| 最近记录: |