MongoDB增量mapReduce,只选择新文件,最后添加mapReduce后添加

Hit*_*osu 8 mapreduce mongodb

假设我有一个包含这样文档的集合(只是简化示例,但它应该显示该方案):

> db.data.find()
{ "_id" : ObjectId("4e9c1f27aa3dd60ee98282cf"), "type" : "A", "value" : 11 }
{ "_id" : ObjectId("4e9c1f33aa3dd60ee98282d0"), "type" : "A", "value" : 58 }
{ "_id" : ObjectId("4e9c1f40aa3dd60ee98282d1"), "type" : "B", "value" : 37 }
{ "_id" : ObjectId("4e9c1f50aa3dd60ee98282d2"), "type" : "B", "value" : 1 }
{ "_id" : ObjectId("4e9c1f56aa3dd60ee98282d3"), "type" : "A", "value" : 85 }
{ "_id" : ObjectId("4e9c1f5daa3dd60ee98282d4"), "type" : "B", "value" : 12 }
Run Code Online (Sandbox Code Playgroud)

现在我需要收集一些关于该集合的统计数据.例如:

db.data.mapReduce(function(){
        emit(this.type,this.value);
     },function(key,values){
        var total = 0;
        for(i in values) {total+=values[i]};
        return total;
     },
{out:'stat'})
Run Code Online (Sandbox Code Playgroud)

将收集'stat'集合中的总计.

> db.stat.find()
{ "_id" : "A", "value" : 154 }
{ "_id" : "B", "value" : 50 }
Run Code Online (Sandbox Code Playgroud)

在这一点上,一切都很完美,但我坚持下一步:

  1. 'data'集合不断更新新数据(旧文档保持不变,只插入,无更新)
  2. 我想定期更新'stat'集合,但不想每次都查询整个'data'集合,所以我选择运行增量mapReduce
  3. 在'data'集合中的每个插入更新'stat'集合并且不使用mapReduce似乎很好,但真实情况比这个例子更复杂,我想只按需获得统计数据.
  4. 为此,我应该只能查询在我上一次mapReduce之后添加的文档
  5. 据我所知,我不能依赖于ObjectId属性,只需存储最后一个属性,然后选择ObjectId>存储的每个文档,因为ObjectId不等于SQL数据库中的自动增量ID(例如,不同的分片将产生不同的ObjectIds).
  6. 我可以更改ObjectId生成器,但不确定如何在分片环境中更好地完成它

所以问题是:

是否可以选择仅在最后一个mapReduce运行增量mapReduce之后添加的文档,或者是否有另一种策略来更新不断增长的集合中的统计数据?

Xav*_* Ho 4

您可以缓存时间并将其用作下一次增量映射缩减的障碍。

我们正在工作中对此进行测试,并且似乎有效。如果我错了,请纠正我,但是当跨分片进行插入时,您无法安全地执行映射缩减。版本变得不一致,您的 Map-Reduce 操作将失败。(如果您找到解决方案,请告诉我!:)

我们改用批量插入,每 5 分钟一次。一旦完成所有批量插入,我们就像这样运行map-reduce(在Python中):

m = Code(<map function>)
r = Code(<reduce function>)

# pseudo code
end = last_time + 5 minutes

# Use time and optionally any other keys you need here
q = bson.SON([("date" : {"$gte" : last_time, "$lt" : end})])

collection.map_reduce(m, r, out=out={"reduce": <output_collection>}, query=q)
Run Code Online (Sandbox Code Playgroud)

请注意,我们使用了reduceand not merge,因为我们不想覆盖之前的内容;我们希望使用相同的reduce 函数将旧结果和新结果组合起来。