MongoDB Spark Connector-聚合速度很慢

j9d*_*9dy 5 mongodb mongodb-java mongodb-query apache-spark

我在Spark应用程序和Mongos控制台上运行相同的聚合管道。在控制台上,眨眼间即可获取数据,只需第二次使用“ it”即可检索所有预期数据。但是,根据Spark WebUI,Spark应用程序将花费近两分钟的时间。

在此处输入图片说明

如您所见,正在启动242个任务来获取结果。我不确定为什么在MongoDB聚合仅返回40个文档的同时启动如此大量的任务。看起来开销很高。

我在Mongos控制台上运行的查询:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])
Run Code Online (Sandbox Code Playgroud)

Spark应用程序代码

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");
Run Code Online (Sandbox Code Playgroud)

之后,我使用hdfs dfs -getmerge /user/spark/output/ output.csv并比较结果。

为什么聚合如此缓慢?难道不是withPipeline要减少需要传输到Spark的数据量吗?看起来它没有像Mongos控制台那样进行聚合。在Mongos控制台上,它正在快速发展。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。

编辑:我想知道的另一件事是启动了两个执行器(因为我正在使用默认执行设置atm),但是只有一个执行器可以完成所有工作。第二个执行者为什么不做任何工作?

在此处输入图片说明

编辑2:当使用其他聚合管道并调用.count()而不是时saveAsTextFile(..),还将创建242个任务。这次将返回65.000个文档。 在此处输入图片说明

Ros*_*oss 3

任务数量过多是由默认的 Mongo Spark 分区器策略引起的。它在计算分区时忽略聚合管道,主要原因有两个:

  1. 减少计算分区的成本
  2. 确保分片和非分片分区程序具有相同的行为

但是,正如您所发现的,它们可以生成空分区,这在您的情况下是昂贵的。

修复的选择可能是:

  1. 更改分区策略

    选择替代分区器以减少分区数量。例如,PaginateByCount 会将数据库分割成一定数量的分区。

    创建您自己的分区器 - 只需实现该特征,您就可以应用聚合管道并对结果进行分区。有关示例,请参阅HalfwayPartitioner自定义分区程序测试。

  2. 使用 $out 将结果预先聚合到一个集合中并从那里读取。

  3. 用于coalesce(N)将分区合并在一起并减少分区数量。
  4. 增加spark.mongodb.input.partitionerOptions.partitionSizeMB配置以产生更少的分区。

自定义分区程序应该产生最佳解决方案,但有一些方法可以更好地利用可用的默认分区程序。

如果您认为应该有一个使用聚合管道来计算分区的默认分区器,那么请向 MongoDB Spark Jira 项目添加票证。