聚合并更新MongoDB

Mou*_*tte 2 mongodb mongodb-query aggregation-framework

我有2个收藏:

  • 客户(6 000 000份文件)
  • 订单(5 000 000份文件)

每天一次,我想计算过去一年,过去一个月和过去一周的订单数量等等.

我试过这个:

db.orders.aggregate(
    {$match: 
        { date_order: { $gt: v_date1year } }
    },
    {$group : {
        _id : "$id_client", 
        count : {$sum : 1}
    }} ,
    {
        "$out": "tmp_indicators"
    }
)

db.tmp_indicators.find({}).forEach(function (my_client) { 
    db.clients.update (
        {"id_client": my_client._id},
        {"$set": 
            { "nb_orders_1year" : my_client.count }
        }
    )
})
Run Code Online (Sandbox Code Playgroud)

我必须这样做3次,过去一年聚合1次,过去一个月1次,过去一周1次.治疗非常缓慢,您是否知道如何以更好的方式执行它?

chr*_*dam 6

为了提高性能,尤其是在处理大型集合时,请利用Bulk()API进行批量更新,因为您将批量发送操作到服务器(例如,批量大小为1000),这样可以提供更好的性能.不会将每个请求发送到服务器(因为您当前正在使用forEach()循环中的更新语句),而是每1000个请求只发送一次,从而使您的更新比当前更高效,更快.

以下示例演示了此方法,第一个使用Bulk()MongoDB版本中提供的API >= 2.6 and < 3.2.它clients通过nb_orders_1year使用聚合结果中的值更改字段来更新集合中的所有文档.

由于该aggregate()方法返回a cursor,您可以使用聚合输出集合的forEach()方法对其进行迭代并访问每个文档,从而批量设置批量更新操作,然后使用API​​有效地通过服务器发送:

var bulk = db.clients.initializeUnorderedBulkOp(),
    pipeline = [
        {
            "$match": { "date_order": { "$gt": v_date1year } }
        },
        {
            "$group": {
                "_id": "$id_client", 
                "count": { "$sum" : 1 }
            }
        },
        { "$out": "tmp_indicators" }        
    ],
    counter = 0;

db.orders.aggregate(pipeline);  
db.tmp_indicators.find().forEach(function (doc) {       
    bulk.find({ "_id": doc._id }).updateOne({ 
        "$set": { "nb_orders_1year": doc.count }
    });

    counter++;
    if (counter % 1000 == 0) {
        bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
        bulk = db.clients.initializeUnorderedBulkOp();
    }
});
// Clean up remaining operations in queue
if (counter % 1000 != 0) { bulk.execute(); }
Run Code Online (Sandbox Code Playgroud)

下一个示例适用于新的MongoDB版本3.2,该版本已弃用Bulk API并提供了一组较新的apis bulkWrite().

它使用与上面相同的游标,但不是迭代结果,而是使用其map()方法创建具有批量操作的数组:

 var pipeline = [
        {
            "$match": { "date_order": { "$gt": v_date1year } }
        },
        {
            "$group": {
                "_id": "$id_client", 
                "count": { "$sum" : 1 }
            }
        },
        { "$out": "tmp_indicators" }        
    ];
db.orders.aggregate(pipeline);
var bulkOps = db.tmp_indicators.find().map(function (doc) { 
        return { 
            "updateOne": { 
                "filter": { "_id": doc._id } ,              
                "update": { "$set": { "nb_orders_1year": doc.count } } 
            }         
        };
    });

db.clients.bulkWrite(bulkOps, { "ordered": true });
Run Code Online (Sandbox Code Playgroud)