Dav*_*ack 7 mapreduce time-series mongodb mongodb-query aggregation-framework
是否可以使用MongoDB聚合框架生成时间序列输出,其中任何被认为属于每个存储桶的源文档都被添加到该存储桶中?
说我的收藏看起来像这样:
/*light_1 on from 10AM to 1PM*/
{
"_id" : "light_1",
"on" : ISODate("2015-01-01T10:00:00Z"),
"off" : ISODate("2015-01-01T13:00:00Z"),
},
/*light_2 on from 11AM to 7PM*/
{
"_id" : "light_2",
"on" : ISODate("2015-01-01T11:00:00Z"),
"off" : ISODate("2015-01-01T19:00:00Z")
}
Run Code Online (Sandbox Code Playgroud)
..我正在使用6小时的桶间隔来生成2015-01-01的报告.我希望我的结果看起来像:
{
"start" : ISODate("2015-01-01T00:00:00Z"),
"end" : ISODate("2015-01-01T06:00:00Z"),
"lights" : []
},
{
"start" : ISODate("2015-01-01T06:00:00Z"),
"end" : ISODate("2015-01-01T12:00:00Z"),
"lights_on" : ["light_1", "light_2"]
},
{
"start" : ISODate("2015-01-01T12:00:00Z"),
"end" : ISODate("2015-01-01T18:00:00Z"),
"lights_on" : ["light_1", "light_2"]
},
{
"start" : ISODate("2015-01-01T18:00:00Z"),
"end" : ISODate("2015-01-02T00:00:00Z"),
"lights_on" : ["light_2"]
}
Run Code Online (Sandbox Code Playgroud)
如果灯的"开启"值<桶'结束'和'关闭'值> =桶'开始',则灯在一个范围内被认为是'开启'
我知道我可以使用$ group和聚合日期运算符按开始或结束时间分组,但在这种情况下,它是一对一的映射.在这里,如果单个源文档跨越多个存储桶,则它可以使其成为多个时间段.
报告范围和间隔跨度直到运行时才知道.
您的目标需要考虑一下在何时将事件记录到给定时间段聚合中时记录事件的注意事项.显而易见的是,您所表示的单个文档实际上可以表示在最终聚合结果的"多个"时间段内报告的事件.
由于要查找的时间段,分析结果是聚合框架范围之外的问题.某些事件需要在可以分组的内容之外"生成",您应该能够看到这些事件.
为了做到这个"生成",你需要mapReduce.这通过JavaScript具有"流控制",因为它的处理语言能够基本上确定开/关之间的时间是否超过一个周期,因此发出它在多于一个周期中发生的数据.
作为旁注,"光"可能不是最适合的,_id因为它可能在给定的一天中多次打开/关闭.所以开/关的"实例"可能更好.但是我只是在这里跟随你的例子,所以要转移它然后只需用_id映射器代码中的引用替换任何实际字段代表灯的标识符.
但在代码上:
// start date and next date for query ( should be external to main code )
var oneHour = ( 1000 * 60 * 60 ),
sixHours = ( oneHour * 6 ),
oneDay = ( oneHour * 24 ),
today = new Date("2015-01-01"), // your input
tomorrow = new Date( today.valueOf() + oneDay ),
yesterday = new Date( today.valueOf() - sixHours ),
nextday = new Date( tomorrow.valueOf() + sixHours);
// main logic
db.collection.mapReduce(
// mapper to emit data
function() {
// Constants and round date to hour
var oneHour = ( 1000 * 60 * 60 )
sixHours = ( oneHour * 6 )
startPeriod = new Date( this.on.valueOf()
- ( this.on.valueOf() % oneHour )),
endPeriod = new Date( this.off.valueOf()
- ( this.off.valueOf() % oneHour ));
// Hour to 6 hour period and convert to UTC timestamp
startPeriod = startPeriod.setUTCHours(
Math.floor( startPeriod.getUTCHours() / 6) * 6 );
endPeriod = endPeriod.setUTCHours(
Math.floor( endPeriod.getUTCHours() / 6) * 6 );
// Init empty reults for each period only on first document processed
if ( counter == 0 ) {
for ( var x = startDay.valueOf(); x < endDay.valueOf(); x+= sixHours ) {
emit(
{ start: new Date(x), end: new Date(x + sixHours) },
{ lights_on: [] }
);
}
}
// Emit for every period until turned off only within the day
for ( var x = startPeriod; x <= endPeriod; x+= sixHours ) {
if ( ( x >= startDay ) && ( x < endDay ) ) {
emit(
{ start: new Date(x), end: new Date(x + sixHours) },
{ lights_on: [this._id] }
);
}
}
counter++;
},
// reducer to keep all lights in one array per period
function(key,values) {
var result = { lights_on: [] };
values.forEach(function(value) {
value.lights_on.forEach(function(light){
if ( result.lights_on.indexOf(light) == -1 )
result.lights_on.push(light);
});
});
result.lights_on.sort();
return result;
},
// options and query
{
"out": { "inline": 1 },
"query": {
"on": { "$gte": yesterday, "$lt": tomorrow },
"$or": [
{ "off": { "$gte:" today, "$lt": nextday } },
{ "off": null },
{ "off": { "$exists": false } }
]
},
"scope": {
"startDay": today,
"endDay": tomorrow,
"counter": 0
}
}
)
Run Code Online (Sandbox Code Playgroud)
本质上,"映射器"功能查看当前记录,将每个开/关时间四舍五入到几小时,然后计算出事件发生的六小时时段的开始时间.
利用这些新的日期值,启动循环以获取开始的"开启"时间并且在该时段期间在单个元件阵列内发出当前"灯"被打开的事件,如稍后所解释的.每个循环将开始周期增加六个小时,直到达到结束"关灯"时间.
它们出现在reducer函数中,它需要返回相同的预期输入,因此在值对象内的时间段内打开了灯光阵列.它在与这些值对象的列表相同的键下处理发出的数据.
首先迭代要减少的值列表,然后查看内部的灯光阵列,这些灯光可能来自之前的减少传递,并将每个光源处理成唯一光源的单个结果数组.简单地通过查找结果数组中的当前光值并推送到不存在的数组来完成.
注意"上一遍",好像你不熟悉mapReduce的工作方式,那么你应该理解reducer函数本身会发出一个结果,这个结果可能是通过处理"key"的"全部"可能值而得不到的.一次通过.它可以且通常仅处理键的发射数据的"子集",因此将以与从映射器发出的数据相同的方式将"减少"结果作为输入.
这一点设计就是为什么mapper和reducer都需要输出具有相同结构的数据,因为reducer本身也可以从先前已经减少的数据中获取它的输入.这就是mapReduce处理发出大量相同键值的大型数据集的方式.它通常以"块"处理,而不是一次处理.
结束减少归结为在此期间打开的灯光列表,每个周期开始和结束为发出的键.像这样:
{
"_id": {
"start": ISODate("2015-01-01T06:00:00Z"),
"end": ISODate("2015-01-01T12:00:00Z")
},
{
"result": {
"lights_on": [ "light_1", "light_2" ]
}
}
},
Run Code Online (Sandbox Code Playgroud)
那个"_id","result"结构只是所有mapReduce输出出来的属性,但所需的值都在那里.
现在还有一个关于查询选择的注释,需要考虑到灯在当天开始之前的某个日期通过其收集条目已经"开启".同样如此,它也可以在报告当前日期之后"关闭",并且实际上可能在null文档中具有值或没有"关闭"键,具体取决于数据的存储方式和当天实际上是被观察到的.
该逻辑从报告的一天开始创建一些必要的计算,并考虑该日期之前和之后的六小时时段,其中列出了查询条件:
{
"on": { "$gte": yesterday, "$lt": tomorrow },
"$or": [
{ "off": { "$gte:" today, "$lt": nextday } },
{ "off": null },
{ "off": { "$exists": false } }
]
}
Run Code Online (Sandbox Code Playgroud)
基本选择那里使用的范围运营商$gte和$lt找到大于或等于且小于分别关于它们,以便在适当的范围内找到的数据检测的值的字段中的值.
在该$or条件下,考虑"关闭"值的各种可能性.要么它落在范围标准之内,要么null通过$exists操作员在文档中具有值或可能没有键.这取决于你实际上如何表示"关闭",其中尚未关闭那些条件的要求$or,但这些将是合理的假设.
与所有MongoDB查询一样,除非另有说明,否则所有条件都是"AND"条件.
这仍然有些瑕疵,具体取决于可能需要打开灯的时间长短.但是这些变量都是有意在外部列出的,用于调整您的需求,同时考虑在报告日期之前或之后获取的预期持续时间.
另一个注意事项是数据本身可能没有任何事件在给定时间段内显示灯亮.出于这个原因,mapper函数中嵌入了一个简单的方法,用于查看我们是否在第一次结果迭代中.
仅在第一次时,会发出一组可能的周期键,其中包括每个周期中打开的灯的空数组.这允许报告还显示那些根本没有亮灯的时段,因为它被插入到发送到减速器和输出的数据中.
您可能会对此方法有所不同,因为它仍然依赖于某些数据符合查询条件以输出任何内容.因此,为了迎合没有记录数据或符合标准的真正"空白日",那么创建一个显示灯光空结果的键的外部哈希表可能会更好.然后将mapReduce操作的结果"合并"到那些预先存在的键中以生成报告.
这里有很多关于日期的计算,并且没有意识到实际的最终语言实现,我只是单独声明在实际mapReduce操作外部工作的任何东西.所以看起来像重复的任何东西都是针对那个意图完成的,使逻辑语言的那一部分独立.大多数编程语言都支持根据使用的方法操作日期的功能.
然后,所有语言特定的输入作为mapReduce方法的最后一个参数显示的选项块传入.值得注意的是,查询中包含了参数值,这些值都是根据要报告的日期计算出来的.然后是"范围",这是一种传递mapReduce操作中的函数可以使用的值的方法.
考虑到这些因素,mapper和reducer的JavaScript代码保持不变,因为这是该方法作为输入所期望的.进程的任何变量都由范围和查询结果提供,以便在不更改代码的情况下获得结果.
因此,主要是因为"光照射"的持续时间可以跨越不同的时期来报告,这成为聚合框架不能设计的事情.它无法执行获取结果所需的"循环"和"数据发送",因此我们使用mapReduce代替此任务.
那说,很好的问题.我不知道你是否已经考虑过如何在这里实现结果的概念,但至少现在有一个指南可以解决类似问题.