Mongodb在组内聚合排序和限制

Car*_*sel 4 mongodb mongodb-query aggregation-framework

我有以下架构的待售物品集合:

var itemSchema = new Schema({
    "category" : { type : Schema.Types.ObjectId, ref : 'Category' },
    "merchant" : { type : Schema.Types.ObjectId, ref : 'Merchant' },
    "rating" : Number
})
Run Code Online (Sandbox Code Playgroud)

我继承了一个聚合查询,该查询返回与按类别划分的类别的项目,按商家分组,组按组中的最大评级排序:

Item.aggregate([
      { "$match" : { category : categoryId, merchant : { $ne : null }}},
      { "$group" : { _id : "$merchant", 
                    rating : { $max : "$rating" }, 
                    items : { $push : "$$ROOT" }}},
      { "$sort" : { rating : -1 }}
    ], { allowDiskUse : true })
    .skip(skip)
    .limit(limit)
Run Code Online (Sandbox Code Playgroud)

在此之后,代码继续按评级对每个组中的项目进行排序,并删除除了每个组中排名前2位的最高评级项目之外的所有项目.

是否可以在组内执行此排序和限制作为聚合函数的一部分,以便聚合只返回每组最高评分的两个项目?

Bla*_*ven 11

基本问题

在可预见的不久的将来,在当前的聚合框架中试图做到这一点并不是最明智的想法.主要问题当然来自您已经拥有的代码中的这一行:

"items" : { "$push": "$$ROOT" }
Run Code Online (Sandbox Code Playgroud)

这恰恰意味着,需要基本上发生的是,需要将分组键中的所有对象推入数组,以便在任何后续代码中获得"前N个"结果.

这显然不会扩展,因为该数组本身的大小可能非常可能超过16MB的BSON限制,并且与分组文档中的其余数据无关.这里的主要问题是不可能"限制推动"仅限于一定数量的项目.关于这样的事情,有一个长期存在的JIRA问题.

仅仅因为这个原因,最实用的方法是对每个分组键的"前N个"项运行单独的查询.这些甚至不需要是.aggregate()声明(取决于数据),并且可以真正地只是限制你想要的"前N"值.

最佳方法

你的架构似乎是node.jsmongoose,但任何支持异步IO和查询的并行执行将是最好的选择.理想情况下,它拥有自己的API库,支持将这些查询的结果合并到一个响应中.

例如,这个简化的示例列表使用您的架构和可用的库(特别是async)完成并行和组合结果:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var data = [
  { "merchant": 1, "rating": 1 },
  { "merchant": 1, "rating": 2 },
  { "merchant": 1, "rating": 3 },
  { "merchant": 2, "rating": 1 },
  { "merchant": 2, "rating": 2 },
  { "merchant": 2, "rating": 3 }
];

var testSchema = new Schema({
  merchant: Number,
  rating: Number
});

var Test = mongoose.model( 'Test', testSchema, 'test' );

async.series(
  [
    function(callback) {
      Test.remove({},callback);
    },
    function(callback) {
      async.each(data,function(item,callback) {
        Test.create(item,callback);
      },callback);
    },
    function(callback) {
      async.waterfall(
        [
          function(callback) {
            Test.distinct("merchant",callback);
          },
          function(merchants,callback) {
            async.concat(
              merchants,
              function(merchant,callback) {
                Test.find({ "merchant": merchant })
                  .sort({ "rating": -1 })
                  .limit(2)
                  .exec(callback);
              },
              function(err,results) {
                console.log(JSON.stringify(results,undefined,2));
                callback(err);
              }
            );
          }
        ],
        callback
      );
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);
Run Code Online (Sandbox Code Playgroud)

这导致输出中每个商家的前2个结果:

[
  {
    "_id": "560d153669fab495071553ce",
    "merchant": 1,
    "rating": 3,
    "__v": 0
  },
  {
    "_id": "560d153669fab495071553cd",
    "merchant": 1,
    "rating": 2,
    "__v": 0
  },
  {
    "_id": "560d153669fab495071553d1",
    "merchant": 2,
    "rating": 3,
    "__v": 0
  },
  {
    "_id": "560d153669fab495071553d0",
    "merchant": 2,
    "rating": 2,
    "__v": 0
  }
]
Run Code Online (Sandbox Code Playgroud)

它确实是最有效的处理方式,虽然它会占用资源,因为它仍然是多个查询.但是,如果您尝试将所有文​​档存储在数组中并进行处理,那么聚合管道中的资源就不会出现.

聚合问题,现在和将来

对于该行,可以考虑到文档的数量不会导致BSON限制的违反,这可以做到这一点.当前版本的MongoDB的方法并不是很好,但即将发布的版本(编写时,3.1.8 dev分支这样做)至少会将一个$slice运算符引入聚合管道.因此,如果您更聪明地进行聚合操作并使用$sort第一个,那么可以轻松选择数组中已经排序的项:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var data = [
  { "merchant": 1, "rating": 1 },
  { "merchant": 1, "rating": 2 },
  { "merchant": 1, "rating": 3 },
  { "merchant": 2, "rating": 1 },
  { "merchant": 2, "rating": 2 },
  { "merchant": 2, "rating": 3 }
];

var testSchema = new Schema({
  merchant: Number,
  rating: Number
});

var Test = mongoose.model( 'Test', testSchema, 'test' );

async.series(
  [
    function(callback) {
      Test.remove({},callback);
    },
    function(callback) {
      async.each(data,function(item,callback) {
        Test.create(item,callback);
      },callback);
    },
    function(callback) {
      Test.aggregate(
        [
          { "$sort": { "merchant": 1, "rating": -1 } },
          { "$group": {
            "_id": "$merchant",
            "items": { "$push": "$$ROOT" }
          }},
          { "$project": {
            "items": { "$slice": [ "$items", 2 ] }
          }}
        ],
        function(err,results) {
          console.log(JSON.stringify(results,undefined,2));
          callback(err);
        }
      );
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);
Run Code Online (Sandbox Code Playgroud)

这产生了与前2个项目相同的基本结果,一旦它们被排序,它们就会从数组中"切片".

它在当前版本中实际上也是"可能的",但是具有相同的基本约束,这仍然涉及在首先对内容进行排序之后将所有内容推送到数组中.它只需要一种"迭代"的方法.您可以对此进行编码以生成更多条目的聚合管道,但只显示"两个"应该表明尝试不是一个非常好的主意:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var data = [
  { "merchant": 1, "rating": 1 },
  { "merchant": 1, "rating": 2 },
  { "merchant": 1, "rating": 3 },
  { "merchant": 2, "rating": 1 },
  { "merchant": 2, "rating": 2 },
  { "merchant": 2, "rating": 3 }
];

var testSchema = new Schema({
  merchant: Number,
  rating: Number
});

var Test = mongoose.model( 'Test', testSchema, 'test' );

async.series(
  [
    function(callback) {
      Test.remove({},callback);
    },
    function(callback) {
      async.each(data,function(item,callback) {
        Test.create(item,callback);
      },callback);
    },
    function(callback) {
      Test.aggregate(
        [
          { "$sort": { "merchant": 1, "rating": -1 } },
          { "$group": {
            "_id": "$merchant",
            "items": { "$push": "$$ROOT" }
          }},
          { "$unwind": "$items" },
          { "$group": {
            "_id": "$_id",
            "first": { "$first": "$items" },
            "items": { "$push": "$items" }
          }},
          { "$unwind": "$items" },
          { "$redact": {
            "$cond": [
              { "$eq": [ "$items", "$first" ] },
              "$$PRUNE",
              "$$KEEP"
            ]
          }},
          { "$group": {
            "_id": "$_id",
            "first": { "$first": "$first" },
            "second": { "$first": "$items" }
          }},
          { "$project": {
            "items": {
              "$map": {
                "input": ["A","B"],
                "as": "el",
                "in": {
                  "$cond": [
                    { "$eq": [ "$$el", "A" ] },
                    "$first",
                    "$second"
                  ]
                }
              }
            }
          }}
        ],
        function(err,results) {
          console.log(JSON.stringify(results,undefined,2));
          callback(err);
        }
      );
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);
Run Code Online (Sandbox Code Playgroud)

在早期版本中再次使用"可能"(这是因为已经标记了2.6引入的功能$$ROOT),基本步骤是存储数组,然后使用$first和比较(以及可能的其他)使每个项目"脱离堆栈" 数组中的项目将删除它们然后从该堆栈中获取"下一个第一"项目,直到最终完成"前N".


结论

直到有一天这样的操作允许$push聚合累加器中的项目被限制为某个计数,那么这对于聚合来说实际上并不是一个实际的操作.

如果这些结果中的数据足够小,您可以这样做,如果数据库服务器具有足够的规格来提供真正的优势,它甚至可能比客户端处理更有效.但是,在大多数合理使用的实际应用中,两者都不会出现这种情况.

最好的办法是先使用"并行查询"选项.它总是可以很好地扩展,并且不需要"编码"这样的逻辑,即特定的分组可能不会返回至少所需的总"前N"项并找出如何保留它们(更长的例子省略了)因为它只是执行每个查询并组合结果.

使用并行查询.它将比你拥有的编码方法更好,并且它将超越长期证明的聚合方法.直到至少有一个更好的选择.