use*_*697 214 mongodb mongodb-query aggregation-framework
我如何(在MongoDB中)将来自多个集合的数据合并到一个集合中?
我可以使用map-reduce吗?如果是,那么如何?
我非常感谢一些例子,因为我是新手.
rma*_*her 142
虽然您无法实时执行此操作,但您可以使用MongoDB 1.8+ map/reduce中的"reduce"输出选项多次运行map-reduce以将数据合并在一起(请参阅http://www.mongodb.org/ display/DOCS/MapReduce#MapReduce-Outputoptions).您需要在两个集合中都有一些密钥才能用作_id.
例如,假设您有一个users集合和一个comments集合,并且您希望拥有一个新集合,其中包含每个评论的一些用户人口统计信息.
假设该users集合包含以下字段:
然后该comments集合包含以下字段:
你会做这个map/reduce:
var mapUsers, mapComments, reduce;
db.users_comments.remove();
// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup
mapUsers = function() {
var values = {
country: this.country,
gender: this.gender,
age: this.age
};
emit(this._id, values);
};
mapComments = function() {
var values = {
commentId: this._id,
comment: this.comment,
created: this.created
};
emit(this.userId, values);
};
reduce = function(k, values) {
var result = {}, commentFields = {
"commentId": '',
"comment": '',
"created": ''
};
values.forEach(function(value) {
var field;
if ("comment" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push(value);
} else if ("comments" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push.apply(result.comments, value.comments);
}
for (field in value) {
if (value.hasOwnProperty(field) && !(field in commentFields)) {
result[field] = value[field];
}
}
});
return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection
Run Code Online (Sandbox Code Playgroud)
此时,您将有一个名为users_comments包含合并数据的新集合,您现在可以使用它.这些缩小的集合都具有_id您在地图函数中发出的关键,然后所有值都是value键内的子对象- 这些值不在这些缩小文档的顶层.
这是一个有点简单的例子.您可以使用更多集合重复此操作,以便继续构建简化集合.您还可以在此过程中对数据进行摘要和聚合.可能您会定义多个reduce函数,因为聚合和保留现有字段的逻辑变得更加复杂.
您还会注意到,现在每个用户都有一个文档,其中包含该数组中所有该用户的注释.如果我们合并具有一对一关系而不是一对多关系的数据,那么它将是平坦的,您可以简单地使用如下的reduce函数:
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
Run Code Online (Sandbox Code Playgroud)
如果你想展平users_comments集合,那么每个评论都是一个文档,另外运行:
var map, reduce;
map = function() {
var debug = function(value) {
var field;
for (field in value) {
print(field + ": " + value[field]);
}
};
debug(this);
var that = this;
if ("comments" in this.value) {
this.value.comments.forEach(function(value) {
emit(value.commentId, {
userId: that._id,
country: that.value.country,
age: that.value.age,
comment: value.comment,
created: value.created,
});
});
}
};
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});
Run Code Online (Sandbox Code Playgroud)
绝对不应该在运行中执行此技术.它适用于cron作业或类似于定期更新合并数据的作业.您可能希望ensureIndex在新集合上运行以确保对其执行的查询快速运行(请记住,您的数据仍然在value密钥内,因此如果您要comments_with_demographics对注释created时间进行索引,那么它将是db.comments_with_demographics.ensureIndex({"value.created": 1});
Bru*_*ebs 117
MongoDB 3.2现在允许通过$ lookup聚合阶段将来自多个集合的数据合并为一个.作为一个实际的例子,假设你有关于书籍的数据分成两个不同的集合.
第一个集合,称为books,具有以下数据:
{
"isbn": "978-3-16-148410-0",
"title": "Some cool book",
"author": "John Doe"
}
{
"isbn": "978-3-16-148999-9",
"title": "Another awesome book",
"author": "Jane Roe"
}
Run Code Online (Sandbox Code Playgroud)
第二个集合称为books_selling_data具有以下数据:
{
"_id": ObjectId("56e31bcf76cdf52e541d9d26"),
"isbn": "978-3-16-148410-0",
"copies_sold": 12500
}
{
"_id": ObjectId("56e31ce076cdf52e541d9d28"),
"isbn": "978-3-16-148999-9",
"copies_sold": 720050
}
{
"_id": ObjectId("56e31ce076cdf52e541d9d29"),
"isbn": "978-3-16-148999-9",
"copies_sold": 1000
}
Run Code Online (Sandbox Code Playgroud)
合并两个集合只需要以下列方式使用$ lookup:
db.books.aggregate([{
$lookup: {
from: "books_selling_data",
localField: "isbn",
foreignField: "isbn",
as: "copies_sold"
}
}])
Run Code Online (Sandbox Code Playgroud)
在此聚合之后,books集合将如下所示:
{
"isbn": "978-3-16-148410-0",
"title": "Some cool book",
"author": "John Doe",
"copies_sold": [
{
"_id": ObjectId("56e31bcf76cdf52e541d9d26"),
"isbn": "978-3-16-148410-0",
"copies_sold": 12500
}
]
}
{
"isbn": "978-3-16-148999-9",
"title": "Another awesome book",
"author": "Jane Roe",
"copies_sold": [
{
"_id": ObjectId("56e31ce076cdf52e541d9d28"),
"isbn": "978-3-16-148999-9",
"copies_sold": 720050
},
{
"_id": ObjectId("56e31ce076cdf52e541d9d28"),
"isbn": "978-3-16-148999-9",
"copies_sold": 1000
}
]
}
Run Code Online (Sandbox Code Playgroud)
重要的是要注意以下几点:
books_selling_data,"from"集合无法分片.因此,作为一个结论,如果你想整合两个集合,在这种情况下,拥有一个平面的copy_sold字段和销售的总副本,你将需要更多的工作,可能使用一个中间集合,然后,被$出来到最终集合.
Hie*_* Le 13
如果mongodb中没有批量插入,我们将所有对象循环small_collection并将它们逐个插入到big_collection:
db.small_collection.find().forEach(function(obj){
db.big_collection.insert(obj)
});
Run Code Online (Sandbox Code Playgroud)
Ani*_*wal 10
$ lookup的非常基本的例子.
db.getCollection('users').aggregate([
{
$lookup: {
from: "userinfo",
localField: "userId",
foreignField: "userId",
as: "userInfoData"
}
},
{
$lookup: {
from: "userrole",
localField: "userId",
foreignField: "userId",
as: "userRoleData"
}
},
{ $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
{ $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
])
Run Code Online (Sandbox Code Playgroud)
这是用的
{ $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
{ $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
Run Code Online (Sandbox Code Playgroud)
代替
{ $unwind:"$userRoleData"}
{ $unwind:"$userRoleData"}
Run Code Online (Sandbox Code Playgroud)
因为{$ unwind:"$ userRoleData"}如果找不到与$ lookup匹配的记录,这将返回空或0结果.
Xav*_*hot 10
开始Mongo 4.4,我们可以通过将新的$unionWith聚合阶段与$group的新$accumulator运算符耦合来在聚合管道中实现这种连接:
// > db.users.find()
// [{ user: 1, name: "x" }, { user: 2, name: "y" }]
// > db.books.find()
// [{ user: 1, book: "a" }, { user: 1, book: "b" }, { user: 2, book: "c" }]
// > db.movies.find()
// [{ user: 1, movie: "g" }, { user: 2, movie: "h" }, { user: 2, movie: "i" }]
db.users.aggregate([
{ $unionWith: "books" },
{ $unionWith: "movies" },
{ $group: {
_id: "$user",
user: {
$accumulator: {
accumulateArgs: ["$name", "$book", "$movie"],
init: function() { return { books: [], movies: [] } },
accumulate: function(user, name, book, movie) {
if (name) user.name = name;
if (book) user.books.push(book);
if (movie) user.movies.push(movie);
return user;
},
merge: function(userV1, userV2) {
if (userV2.name) userV1.name = userV2.name;
userV1.books.concat(userV2.books);
userV1.movies.concat(userV2.movies);
return userV1;
},
lang: "js"
}
}
}}
])
// { _id: 1, user: { books: ["a", "b"], movies: ["g"], name: "x" } }
// { _id: 2, user: { books: ["c"], movies: ["h", "i"], name: "y" } }
Run Code Online (Sandbox Code Playgroud)
$unionWith在聚合管道中已有的文档中组合来自给定集合的记录。在 2 个联合阶段之后,我们就拥有了管道中的所有用户、书籍和电影记录。
然后$group,我们$user使用$accumulator运算符记录并累积项目,允许在文档分组时自定义累积文档:
accumulateArgs。init 定义在我们对元素进行分组时将累积的状态。accumulate函数允许使用分组的记录执行自定义操作,以构建累积状态。例如,如果被分组的项目book定义了字段,那么我们更新books状态的一部分。merge用于合并两个内部状态。它仅用于在分片集群上运行的聚合或当操作超过内存限制时。在聚合中对多个集合使用多个$ lookup
查询:
db.getCollection('servicelocations').aggregate([
{
$match: {
serviceLocationId: {
$in: ["36728"]
}
}
},
{
$lookup: {
from: "orders",
localField: "serviceLocationId",
foreignField: "serviceLocationId",
as: "orders"
}
},
{
$lookup: {
from: "timewindowtypes",
localField: "timeWindow.timeWindowTypeId",
foreignField: "timeWindowTypeId",
as: "timeWindow"
}
},
{
$lookup: {
from: "servicetimetypes",
localField: "serviceTimeTypeId",
foreignField: "serviceTimeTypeId",
as: "serviceTime"
}
},
{
$unwind: "$orders"
},
{
$unwind: "$serviceTime"
},
{
$limit: 14
}
])
Run Code Online (Sandbox Code Playgroud)
结果:
{
"_id" : ObjectId("59c3ac4bb7799c90ebb3279b"),
"serviceLocationId" : "36728",
"regionId" : 1.0,
"zoneId" : "DXBZONE1",
"description" : "AL HALLAB REST EMIRATES MALL",
"locationPriority" : 1.0,
"accountTypeId" : 1.0,
"locationType" : "SERVICELOCATION",
"location" : {
"makani" : "",
"lat" : 25.119035,
"lng" : 55.198694
},
"deliveryDays" : "MTWRFSU",
"timeWindow" : [
{
"_id" : ObjectId("59c3b0a3b7799c90ebb32cde"),
"timeWindowTypeId" : "1",
"Description" : "MORNING",
"timeWindow" : {
"openTime" : "06:00",
"closeTime" : "08:00"
},
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b0a3b7799c90ebb32cdf"),
"timeWindowTypeId" : "1",
"Description" : "MORNING",
"timeWindow" : {
"openTime" : "09:00",
"closeTime" : "10:00"
},
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b0a3b7799c90ebb32ce0"),
"timeWindowTypeId" : "1",
"Description" : "MORNING",
"timeWindow" : {
"openTime" : "10:30",
"closeTime" : "11:30"
},
"accountId" : 1.0
}
],
"address1" : "",
"address2" : "",
"phone" : "",
"city" : "",
"county" : "",
"state" : "",
"country" : "",
"zipcode" : "",
"imageUrl" : "",
"contact" : {
"name" : "",
"email" : ""
},
"status" : "ACTIVE",
"createdBy" : "",
"updatedBy" : "",
"updateDate" : "",
"accountId" : 1.0,
"serviceTimeTypeId" : "1",
"orders" : [
{
"_id" : ObjectId("59c3b291f251c77f15790f92"),
"orderId" : "AQ18O1704264",
"serviceLocationId" : "36728",
"orderNo" : "AQ18O1704264",
"orderDate" : "18-Sep-17",
"description" : "AQ18O1704264",
"serviceType" : "Delivery",
"orderSource" : "Import",
"takenBy" : "KARIM",
"plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
"plannedDeliveryTime" : "",
"actualDeliveryDate" : "",
"actualDeliveryTime" : "",
"deliveredBy" : "",
"size1" : 296.0,
"size2" : 3573.355,
"size3" : 240.811,
"jobPriority" : 1.0,
"cancelReason" : "",
"cancelDate" : "",
"cancelBy" : "",
"reasonCode" : "",
"reasonText" : "",
"status" : "",
"lineItems" : [
{
"ItemId" : "BNWB020",
"size1" : 15.0,
"size2" : 78.6,
"size3" : 6.0
},
{
"ItemId" : "BNWB021",
"size1" : 20.0,
"size2" : 252.0,
"size3" : 11.538
},
{
"ItemId" : "BNWB023",
"size1" : 15.0,
"size2" : 285.0,
"size3" : 16.071
},
{
"ItemId" : "CPMW112",
"size1" : 3.0,
"size2" : 25.38,
"size3" : 1.731
},
{
"ItemId" : "MMGW001",
"size1" : 25.0,
"size2" : 464.375,
"size3" : 46.875
},
{
"ItemId" : "MMNB218",
"size1" : 50.0,
"size2" : 920.0,
"size3" : 60.0
},
{
"ItemId" : "MMNB219",
"size1" : 50.0,
"size2" : 630.0,
"size3" : 40.0
},
{
"ItemId" : "MMNB220",
"size1" : 50.0,
"size2" : 416.0,
"size3" : 28.846
},
{
"ItemId" : "MMNB270",
"size1" : 50.0,
"size2" : 262.0,
"size3" : 20.0
},
{
"ItemId" : "MMNB302",
"size1" : 15.0,
"size2" : 195.0,
"size3" : 6.0
},
{
"ItemId" : "MMNB373",
"size1" : 3.0,
"size2" : 45.0,
"size3" : 3.75
}
],
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b291f251c77f15790f9d"),
"orderId" : "AQ137O1701240",
"serviceLocationId" : "36728",
"orderNo" : "AQ137O1701240",
"orderDate" : "18-Sep-17",
"description" : "AQ137O1701240",
"serviceType" : "Delivery",
"orderSource" : "Import",
"takenBy" : "KARIM",
"plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
"plannedDeliveryTime" : "",
"actualDeliveryDate" : "",
"actualDeliveryTime" : "",
"deliveredBy" : "",
"size1" : 28.0,
"size2" : 520.11,
"size3" : 52.5,
"jobPriority" : 1.0,
"cancelReason" : "",
"cancelDate" : "",
"cancelBy" : "",
"reasonCode" : "",
"reasonText" : "",
"status" : "",
"lineItems" : [
{
"ItemId" : "MMGW001",
"size1" : 25.0,
"size2" : 464.38,
"size3" : 46.875
},
{
"ItemId" : "MMGW001-F1",
"size1" : 3.0,
"size2" : 55.73,
"size3" : 5.625
}
],
"accountId" : 1.0
},
{
"_id" : ObjectId("59c3b291f251c77f15790fd8"),
"orderId" : "AQ110O1705036",
"serviceLocationId" : "36728",
"orderNo" : "AQ110O1705036",
"orderDate" : "18-Sep-17",
"description" : "AQ110O1705036",
"serviceType" : "Delivery",
"orderSource" : "Import",
"takenBy" : "KARIM",
"plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
"plannedDeliveryTime" : "",
"actualDeliveryDate" : "",
"actualDeliveryTime" : "",
"deliveredBy" : "",
"size1" : 60.0,
"size2" : 1046.0,
"size3" : 68.0,
"jobPriority" : 1.0,
"cancelReason" : "",
"cancelDate" : "",
"cancelBy" : "",
"reasonCode" : "",
"reasonText" : "",
"status" : "",
"lineItems" : [
{
"ItemId" : "MMNB218",
"size1" : 50.0,
"size2" : 920.0,
"size3" : 60.0
},
{
"ItemId" : "MMNB219",
"size1" : 10.0,
"size2" : 126.0,
"size3" : 8.0
}
],
"accountId" : 1.0
}
],
"serviceTime" : {
"_id" : ObjectId("59c3b07cb7799c90ebb32cdc"),
"serviceTimeTypeId" : "1",
"serviceTimeType" : "nohelper",
"description" : "",
"fixedTime" : 30.0,
"variableTime" : 0.0,
"accountId" : 1.0
}
}
Run Code Online (Sandbox Code Playgroud)
在单个查询中,可以使用聚合和查找以“ SQL UNION”方式在MongoDB中进行联合。这是我测试过的可用于MongoDB 4.0的示例:
// Create employees data for testing the union.
db.getCollection('employees').insert({ name: "John", type: "employee", department: "sales" });
db.getCollection('employees').insert({ name: "Martha", type: "employee", department: "accounting" });
db.getCollection('employees').insert({ name: "Amy", type: "employee", department: "warehouse" });
db.getCollection('employees').insert({ name: "Mike", type: "employee", department: "warehouse" });
// Create freelancers data for testing the union.
db.getCollection('freelancers').insert({ name: "Stephany", type: "freelancer", department: "accounting" });
db.getCollection('freelancers').insert({ name: "Martin", type: "freelancer", department: "sales" });
db.getCollection('freelancers').insert({ name: "Doug", type: "freelancer", department: "warehouse" });
db.getCollection('freelancers').insert({ name: "Brenda", type: "freelancer", department: "sales" });
// Here we do a union of the employees and freelancers using a single aggregation query.
db.getCollection('freelancers').aggregate( // 1. Use any collection containing at least one document.
[
{ $limit: 1 }, // 2. Keep only one document of the collection.
{ $project: { _id: '$$REMOVE' } }, // 3. Remove everything from the document.
// 4. Lookup collections to union together.
{ $lookup: { from: 'employees', pipeline: [{ $match: { department: 'sales' } }], as: 'employees' } },
{ $lookup: { from: 'freelancers', pipeline: [{ $match: { department: 'sales' } }], as: 'freelancers' } },
// 5. Union the collections together with a projection.
{ $project: { union: { $concatArrays: ["$employees", "$freelancers"] } } },
// 6. Unwind and replace root so you end up with a result set.
{ $unwind: '$union' },
{ $replaceRoot: { newRoot: '$union' } }
]);
Run Code Online (Sandbox Code Playgroud)
以下是其工作原理的说明:
实例化数据库中至少包含一个文档aggregate的任何集合。如果不能保证数据库的任何集合都不为空,则可以通过在数据库中创建某种“虚拟”集合来解决此问题,该“虚拟”集合中仅包含一个空文档,专门用于进行联合查询。
使管道的第一阶段成为{ $limit: 1 }。这将删除集合中除第一个文档外的所有文档。
通过使用$project阶段来剥离剩余文档的所有字段:
{ $project: { _id: '$$REMOVE' } }
Run Code Online (Sandbox Code Playgroud)您的汇总现在包含一个空文档。现在该为要合并在一起的每个集合添加查找。您可以使用该pipeline字段进行某些特定的过滤,也可以使用离开localField和foreignField设为null来匹配整个集合。
{ $lookup: { from: 'collectionToUnion1', pipeline: [...], as: 'Collection1' } },
{ $lookup: { from: 'collectionToUnion2', pipeline: [...], as: 'Collection2' } },
{ $lookup: { from: 'collectionToUnion3', pipeline: [...], as: 'Collection3' } }
Run Code Online (Sandbox Code Playgroud)现在,您有一个包含单个文档的聚合,该文档包含3个数组,如下所示:
{
Collection1: [...],
Collection2: [...],
Collection3: [...]
}
Run Code Online (Sandbox Code Playgroud)
然后,您可以使用一个$project阶段以及$concatArrays聚合运算符将它们合并到一个数组中:
{
"$project" :
{
"Union" : { $concatArrays: ["$Collection1", "$Collection2", "$Collection3"] }
}
}
Run Code Online (Sandbox Code Playgroud)现在,您有了一个包含单个文档的聚合,其中包含一个包含集合的并存的数组。剩下要做的是添加$unwind和$replaceRoot阶段,以将数组拆分为单独的文档:
{ $unwind: "$Union" },
{ $replaceRoot: { newRoot: "$Union" } }
Run Code Online (Sandbox Code Playgroud)Voilà。现在,您有一个结果集,其中包含要合并在一起的集合。然后,您可以添加更多阶段以对其进行进一步过滤,排序,应用skip()和limit()。您想要的几乎任何东西。
lob*_*234 -3
您必须在应用程序层中执行此操作。如果您使用 ORM,它可以使用注释(或类似的东西)来提取其他集合中存在的引用。我只使用过Morphia,注释@Reference在查询时会获取引用的实体,因此我能够避免自己在代码中执行此操作。
| 归档时间: |
|
| 查看次数: |
255588 次 |
| 最近记录: |