Spark到mongo db插入需要60小时才能获取60Gb数据

Swa*_*p P 0 mongodb apache-spark

我正在使用Spark:1.6.2MongoDB:3.2.8

我有一个8列和10亿行的数据框。随机写入数据帧为60GB。

我将使用mongo-spark-conector(mongo-spark-connector_2.10)在mongodb中插入该数据框。

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save();
Run Code Online (Sandbox Code Playgroud)

插入需要10个小时以上。

如何提高性能?

Ros*_*oss 6

没什么可继续的:

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save()

但是无论mongoDBOptions调整如何,都需要进行两次折叠,并且性能瓶颈必须在Spark和MongoDB中修复。成功的关键是理解上面代码的运行情况,然后才能确定提高性能的最佳方法。

火花

我有一个8列和10亿行的数据框。随机写入数据帧为60GB。

上没有任何信息,sourceValueDf但是您将需要配置源并了解瓶颈所在?请参阅Spark监视文档,以了解如何更多了解Spark作业中发生的事情。

通常,Spark调整的关键点是:分区缓存序列化随机操作。有关更多信息,请参见cloudera撰写的精彩博客文章:使用Apache Spark:或者,如何学习停止烦恼和热爱随机播放。如何改善Spark作业有很多潜在的优势。

MongoDB

让我们看一下MongoDB Spark连接器的作用:

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save()

在此,连接器采用底层的RDD,并使用以下逻辑将数据作为文档保存在现有数据库中:

rdd.foreachPartition(iter => if (iter.nonEmpty) {
    mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
    iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.insertMany(batch.toList.asJava))
    })
})
Run Code Online (Sandbox Code Playgroud)

对于每个分区,它将每批使用512个文档(底层Java驱动程序批量大小)批处理写入insertMany的内容。分区中的少量分区sourceValueDf可能会对保存的性能产生负面影响。增加分区的数量可能会提高此方法在Spark工作者之间的并行性,从而提高吞吐量。

还有其他一些通用方法可以提高对MongoDB的批量操作的写入性能:

  • 网络

    确保Spark Worker和MongoDB实例位于同一位置或具有尽可能小的网络跃点。您无法击败物理学。

  • 分片

    通过插入分片集合来提高写入的并行性,尤其是在插入按分片键预先排序的数据时。Spark工人与Sharded MongoD的并置可以提供最快的写入场景。有关配置选项的更多信息,请参见连接器文档中的“ 如何实现数据局部性”部分。

  • 指标

    在插入数据之前先删除索引,然后再重建它们。当插入大量数据时,用户发现性能提高了,方法是在流程开始时删除索引,然后在索引结束时仅构建一次。例如:

    val writeConfig = WriteConfig(mongoDbOptions)
    MongoConnector(writeConfig.asOptions).withCollectionDo(writeConfig, {
      coll: MongoCollection[Document] => coll.dropIndex("index")
    })
    
    MongoSpark.write(sourceValueDf)
              .options(writeConfig.asOptions)
              .mode(SaveMode.Append)
              .save()
    
    MongoConnector(writeConfig.asOptions).withCollectionDo(writeConfig, {  
      coll: MongoCollection[Document] => coll.createIndex(...)
    })
    
    Run Code Online (Sandbox Code Playgroud)
  • 写关注

    仅写入主节点而不等待复制可以提高速度,但要以冗余为代价。可以通过WriteConfig/ 配置mongoDbOptions。请参阅输出配置文档

运行此作业时,MongoDB计算机上的负载是多少?是瓶颈吗?诸如MongoDB Cloud Manager之类的系统可提供完整的性能可见性和监控,以帮助您了解MongoDB层正在发生的事情。

改善MongoDB和Spark性能

简而言之,没有灵丹妙药或魔术配置选项可以帮助提高性能。它将需要调试,了解当前问题以及可能考虑的Spark和MongoDB集群配置。在一起,它们已经被证明可以提供非常快速的计算和存储,但是这取决于用法和每个系统协同工作。

第一步是使用可用的监视工具来了解瓶颈在哪里。