Mongodb聚合$ group,限制数组的长度

ma0*_*a08 15 database mongoose mongodb mongodb-query aggregation-framework

我想根据字段对所有文档进行分组,但是要限制为每个值分组的文档数.

每条消息都有一个conversation_ID.我需要为每个conversation_ID获取10或更少数量的消息.

我可以根据以下命令进行分组,但除了切片结果之外,无法弄清楚如何限制分组文档的数量 Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}})

如何将每个conversation_ID的msgs数组的长度限制为10?

Nei*_*unn 15

现代

从MongoDB 3.6开始,有一种"新颖"的方法,通过使用$lookup执行"自连接"的方式,与下面演示的原始光标处理大致相同.

因为在这个版本中你可以指定一个"pipeline"参数$lookup作为"join"的源,这实际上意味着你可以使用$match$limit收集和"限制"数组的条目:

db.messages.aggregate([
  { "$group": { "_id": "$conversation_ID" } },
  { "$lookup": {
    "from": "messages",
    "let": { "conversation": "$_id" },
    "pipeline": [
      { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
      { "$limit": 10 },
      { "$project": { "_id": 1 } }
    ],
    "as": "msgs"
  }}
])
Run Code Online (Sandbox Code Playgroud)

你可以选择在之后添加额外的投影$lookup,以便使数组项只是值而不是带有_id键的文档,但基本结果是通过简单地执行上述操作.

仍有优秀的SERVER-9277实际上直接要求"限制推送",但$lookup在此期间使用这种方式是一种可行的替代方案.

注意:$slice在原始内容中写出原始答案并提及"杰出的JIRA问题"后,也会引入.虽然您可以使用较小的结果集获得相同的结果,但它确实涉及仍然"将所有内容"推入数组中,然后将最终数组输出限制为所需的长度.

所以这是主要的区别,以及为什么它$slice对于大的结果通常是不实际的.但当然可以在它的情况下交替使用.

有关mongodb组值的更多详细信息,请参阅多个字段,以了解其他用法.


原版的

如前所述,这不是不可能的,但肯定是一个可怕的问题.

实际上,如果你主要担心的是你的结果数组会非常大,那么你最好的方法是将每个不同的"conversation_ID"作为单独的查询提交,然后结合你的结果.在非常MongoDB 2.6语法中,可能需要进行一些调整,具体取决于您的语言实现是什么:

var results = [];
db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID"
    }}
]).forEach(function(doc) {
    db.messages.aggregate([
        { "$match": { "conversation_ID": doc._id } },
        { "$limit": 10 },
        { "$group": {
            "_id": "$conversation_ID",
            "msgs": { "$push": "$_id" }
        }}
    ]).forEach(function(res) {
        results.push( res );
    });
});
Run Code Online (Sandbox Code Playgroud)

但这一切都取决于你是否想要避免这种情况.那么真正的答案:


这里的第一个问题是没有"限制"被"推"到数组中的项目数的功能.这当然是我们想要的,但功能目前还不存在.

第二个问题是,即使将所有项目推送到数组中,也无法$slice在聚合管道中使用或使用任何类似的运算符.因此,目前没有办法通过简单的操作从生成的数组中获得"前10名"结果.

但实际上,您可以生成一组操作,以有效地"分割"您的分组边界.这是相当复杂的,例如在这里我将把数组元素"切成"仅减少到"六".这里的主要原因是演示该过程并展示如何执行此操作,而不会破坏不包含您要"切片"到的总数的数组.

给出一份文件样本:

{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }
Run Code Online (Sandbox Code Playgroud)

您可以看到,根据您的条件进行分组时,您将获得一个包含十个元素的数组,另一个包含"五个"元素.你想要做的是将两者都减少到顶部的"六"而不"破坏"只与"五"元素匹配的数组.

以下查询:

db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID",
        "first": { "$first": "$_id" },
        "msgs": { "$push": "$_id" },
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "seen": { "$eq": [ "$first", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "seen": { "$eq": [ "$second", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "seen": { "$eq": [ "$third", "$msgs" ] },
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "seen": { "$eq": [ "$forth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "fifth": 1,
        "seen": { "$eq": [ "$fifth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$fifth" },
        "sixth": { "$first": "$msgs" },
    }},
    { "$project": {
         "first": 1,
         "second": 1,
         "third": 1,
         "forth": 1,
         "fifth": 1,
         "sixth": 1,
         "pos": { "$const": [ 1,2,3,4,5,6 ] }
    }},
    { "$unwind": "$pos" },
    { "$group": {
        "_id": "$_id",
        "msgs": {
            "$push": {
                "$cond": [
                    { "$eq": [ "$pos", 1 ] },
                    "$first",
                    { "$cond": [
                        { "$eq": [ "$pos", 2 ] },
                        "$second",
                        { "$cond": [
                            { "$eq": [ "$pos", 3 ] },
                            "$third",
                            { "$cond": [
                                { "$eq": [ "$pos", 4 ] },
                                "$forth",
                                { "$cond": [
                                    { "$eq": [ "$pos", 5 ] },
                                    "$fifth",
                                    { "$cond": [
                                        { "$eq": [ "$pos", 6 ] },
                                        "$sixth",
                                        false
                                    ]}
                                ]}
                            ]}
                        ]}
                    ]}
                ]
            }
        }
    }},
    { "$unwind": "$msgs" },
    { "$match": { "msgs": { "$ne": false } }},
    { "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }}
])
Run Code Online (Sandbox Code Playgroud)

您将获得阵列中的最佳结果,最多六个条目:

{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }
Run Code Online (Sandbox Code Playgroud)

正如你在这里看到的,充满乐趣.

在最初分组之后,您基本上希望$first从阵列结果中"弹出" 堆栈的值.为了简化这个过程,我们实际上在初始操作中执行此操作.所以过程变成:

  • $unwind 数组
  • 与已经在$eq相等匹配中看到的值进行比较
  • $sort结果将false看不见的值"浮动" 到顶部(这仍保留订单)
  • $group再次返回并将" $first看不见"的值"弹出" 为堆栈中的下一个成员.此外,它还使用$cond运算符替换数组堆栈中的"see"值,false以帮助进行评估.

最后的操作$cond就是确保将来的迭代不只是在"slice"计数大于数组成员的地方反复添加数组的最后一个值.

需要针对您希望"切片"的多个项目重复整个过程.由于我们已经在初始分组中找到了"第一"项,这意味着n-1迭代所需的切片结果.

最后的步骤实际上只是将所有内容转换回数组的可选示例,最终显示结果.因此,实际上只是有条件地推动项目或false按其匹配位置返回,最后"过滤"所有false值,以便结束阵列分别具有"六"和"五"成员.

因此,没有标准的运算符可以满足这一要求,您不能将推送"限制"为5或10或阵列中的任何项目.但如果你真的必须这样做,那么这是你最好的方法.


你可以使用mapReduce来解决这个问题并将聚合框架放在一起.我将采取的方法(在合理的限制范围内)将有效地在服务器上具有内存中的哈希映射并将数组累积到该数据库,同时使用JavaScript切片来"限制"结果:

db.messages.mapReduce(
    function () {

        if ( !stash.hasOwnProperty(this.conversation_ID) ) {
            stash[this.conversation_ID] = [];
        }

        if ( stash[this.conversation_ID.length < maxLen ) {
            stash[this.conversation_ID].push( this._id );
            emit( this.conversation_ID, 1 );
        }

    },
    function(key,values) {
        return 1;   // really just want to keep the keys
    },
    { 
        "scope": { "stash": {}, "maxLen": 10 },
        "finalize": function(key,value) {
            return { "msgs": stash[key] };                
        },
        "out": { "inline": 1 }
    }
)
Run Code Online (Sandbox Code Playgroud)

因此,基本上构建"内存中"对象,匹配发出的"键",数组永远不会超过您想从结果中获取的最大大小.另外,当满足最大堆栈时,这甚至不会"发出"该项目.

除了基本上只减少到"关键"和单个值之外,reduce部分实际上什么都不做.因此,万一我们的reducer没有被调用,如果一个键只存在1个值,那么finalize函数负责将"stash"键映射到最终输出.

其有效性因输出的大小而异,JavaScript评估肯定不会很快,但可能比在管道中处理大型数组更快.


投票了JIRA问题实际上有"$推"和"$ addToSet"一"片"运营商甚至是"限价",这既得心应手.个人希望至少可以对$map操作员进行一些修改,以便在处理时暴露"当前索引"值.这将有效地允许"切片"和其他操作.

实际上,您可能希望将其编码为"生成"所有必需的迭代.如果这里的答案得到足够的爱和/或其他时间待定,那么我可能会添加一些代码来演示如何执行此操作.这已经是一个相当长的反应.


生成管道的代码:

var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;

var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": []  } } };

for ( var x = 1; x <= maxLen; x++ ) {

    fproj["$project"][""+x] = 1;
    fproj["$project"]["pos"]["$const"].push( x );

    var rec = {
        "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
    };
    if ( stack.length == 0 ) {
        rec["$cond"].push( false );
    } else {
        lval = stack.pop();
        rec["$cond"].push( lval );
    }

    stack.push( rec );

    if ( x == 1) {
        pipe.push({ "$group": {
           "_id": key,
           "1": { "$first": val },
           "msgs": { "$push": val }
        }});
    } else {
        pipe.push({ "$unwind": "$msgs" });
        var proj = {
            "$project": {
                "msgs": 1
            }
        };

        proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };

        var grp = {
            "$group": {
                "_id": "$_id",
                "msgs": {
                    "$push": {
                        "$cond": [ { "$not": "$seen" }, "$msgs", false ]
                    }
                }
            }
        };

        for ( n=x; n >= 1; n-- ) {
            if ( n != x ) 
                proj["$project"][""+n] = 1;
            grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
        }

        pipe.push( proj );
        pipe.push({ "$sort": { "seen": 1 } });
        pipe.push(grp);
    }
}

pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": stack[0] }
    }
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }
}); 
Run Code Online (Sandbox Code Playgroud)

这构建了基本的迭代方法,直到maxLen步骤$unwind$group.还嵌入了所需的最终投影和"嵌套"条件语句的详细信息.最后一个基本上是针对这个问题的方法:

MongoDB的$ in子句是否保证订单?