MongoDB的mapReduce:分区键到单个reducer并影响键排序

Dyi*_*yin 5 mapreduce mongodb

我真的陷入困境,我必须强制执行mapReduce框架只使用一个reducer一个特定的密钥.另外,我想影响框架如何对键进行排序.我将在一个例子中介绍这个问题:

我想以下面的形式发出键值对:

< bxb >:<d1>
< bx >:<d2>
< b >:<d3>
< bax >:<d2,d3>
图1

关键是一个序列 - 如你所见 - 每个都以一个项目b开始,它是一个数据类型string.ObjectID如字母d和数字所示,值将是s .我从map函数中发出了其他键值对,它们从键中的不同项开始,例如ax:

< abx >:<d1>
< ax >:<d3>
< xaa >:<d3>
图2

我需要强制框架reduce为每个键值对调用一个函数,该函数以特定项开始.此外,我必须强制在反向词典顺序中对键进行排序mapreduce排序.因此,单个reducer将为项b接收以下键值对:

< bxb >:<d1>
< bx >:<d2>
< bax >:<d2,d3>
< b >:<d3>
图3

我尝试过的:

我试图以下面的形式发出键值对:

b:<(d1:< bxb >)>
b:<(d2:< bx >)>
b:<(d3:< b >)>
b:<(d2:< bax >),(d3:< bax > )>
图4

这样一个reducer接收到项b的值,但是你没有看到反向词典顺序,最糟糕的是,不能保证单个reducer可以得到特定键的所有值(如MongoDB的MapReduce)文件说明).

基本上:我必须以反向字典顺序处理以特定项开头的这些序列.

我没有想法会让我进一步解决问题.如何为密钥强制执行单个reducer并影响排序?我应该如何设计传递(发出)的数据结构以满足我的需求?

这些功能类似于Hadoop ComparatorPartitioner.

更新------------------------------------------------- -------------------------------------------------- ---------------------

Asya Kamsky向我指出了这一点 finalize每个键只运行一次,所以它解决了分区问题,当一个减速器必须看到特定键的每个值时.

排序仍然是一个问题.对于大型数据集,在内部实现我自己的排序finalize意味着执行时间方面的巨大瓶颈,而我没有在map和之间使用自然排序机制reduce.密钥是数据类型string,但很容易将它们编码为负数integers以强制反向排序.

让我们再次检查图3:

< bxb >:<d1>
< bx >:<d2>
< bax >:<d2,d3>
< b >:<d3>
图3

这就是finalize关键b必须得到的东西.例如,键< b x b >是复合的.Finalize需要接收以b开头的键,但是对于键的其他部分,以反向字典顺序.

有没有办法实现这一点,避免内部排序finalize

Asy*_*sky 3

您可以做的是“正常”发出文档,并使用reduce 将所有发出的值合并到一个排序数组中。然后使用finalize方法来执行您要在单个减速器中执行的任何处理。

MongoDB的reduce函数可以被多次调用,但也可以从不被调用(如果特定键只发出单个值)。使用finalize可以解决这两个问题,因为每个键只调用一次。

样本数据:

> db.sorts.find()
{ "_id" : 1, "b" : 1, "a" : 20 }
{ "_id" : 2, "b" : 1, "a" : 2 }
{ "_id" : 3, "b" : 2, "a" : 12 }
{ "_id" : 4, "b" : 3, "a" : 1 }
{ "_id" : 5, "b" : 2, "a" : 1 }
{ "_id" : 6, "b" : 3, "a" : 11 }
{ "_id" : 7, "b" : 3, "a" : 5 }
{ "_id" : 8, "b" : 2, "a" : 1 }
{ "_id" : 9, "b" : 1, "a" : 15 }
Run Code Online (Sandbox Code Playgroud)

地图功能:

map = function() {
   emit( this.b, { val: [ this.a ] } );
}
Run Code Online (Sandbox Code Playgroud)

Reduce 函数通过遍历数组将新传入的 val 添加到已排序的数组中:

reduce = function( key, values) {
   var result = { val: [ ] };
   values.forEach(function(v) {
      var newval = v.val[0];
      var added = false;
      for (var i=0; i < result.val.length; i++) {
           if (newval < result.val[i]) {
                 result.val.splice(i, 0, newval);
                 added=true;
                 break;
           }
      }
      if ( !added ) {
         result.val.splice(result.val.length, 0, newval);
      }
   });
   return result;
}
Run Code Online (Sandbox Code Playgroud)

Finalize 仅返回一个简单的数组:

finalize = function( key, values ) {
   // values is document with a sorted array
   // do your "single reduce" functionality here
   return values.val;
}
Run Code Online (Sandbox Code Playgroud)

运行MapReduce:

> db.sorts.mapReduce(map, reduce, {out:"outs", finalize:finalize})
{
    "result" : "outs",
    "timeMillis" : 10,
    "counts" : {
        "input" : 9,
        "emit" : 9,
        "reduce" : 3,
        "output" : 3
    },
    "ok" : 1,
}
Run Code Online (Sandbox Code Playgroud)

结果是:

> db.outs.find()
{ "_id" : 1, "value" : [  2,  15,  20 ] }
{ "_id" : 2, "value" : [  1,  1,  12 ] }
{ "_id" : 3, "value" : [  1,  5,  11 ] }
Run Code Online (Sandbox Code Playgroud)