Rob*_*sky 9 algorithm hadoop mapreduce
我正在研究类似于规范MapReduce示例的东西 - 单词计数,但有一个转折,我希望只获得前N个结果.
假设我在HDFS中有一组非常大的文本数据.有很多示例显示如何构建Hadoop MapReduce作业,该作业将为该文本中的每个单词提供单词计数.例如,如果我的语料库是:
"这是对测试数据的测试,也是测试数据的好方法"
标准MapReduce字数统计作业的结果集将是:
测试:3,a:2,这个:2,是:1等.
但如果我只想要得到我的整个数据集中使用的前3个字?
我仍然可以运行完全相同的标准MapReduce字数统计工作,然后只需准备好前三个结果,并且每个字都吐出计数,但这似乎有点低效,因为很多数据需要在洗牌阶段被移动.
我的想法是,如果这个样本足够大,并且数据随机且在HDFS中分布良好,那么每个Mapper都不需要将所有字数发送到Reducers,而只需要一些最重要的数据.所以如果一个mapper有这个:
a:8234,:5422,男:4352,...... 更多的话 ......,难得一见:1,怪词:1,等等
那么我想要做的只是将每个Mapper中的前100个左右的单词发送到Reducer阶段 - 因为"罕见"几乎没有机会在完成所有内容时突然进入前三名.这似乎可以节省带宽和减速器处理时间.
这可以在Combiner阶段完成吗?通常在洗牌阶段之前进行这种优化吗?
这是一个非常好的问题,因为你已经达到了Hadoop字数统计示例的低效率.
优化问题的技巧如下:
HashMap
在本地地图阶段进行基础分组,您也可以使用组合器.这可能看起来像这样,我使用的HashMultiSet
是Guava,它有助于实现一个很好的计数机制.
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
Run Code Online (Sandbox Code Playgroud)
然后在清理阶段发出结果:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
Run Code Online (Sandbox Code Playgroud)
因此,您已将单词分组到本地工作块中,从而通过使用一些RAM来减少网络使用.你也可以用a来做同样的事情Combiner
,但是它要分组 - 所以这比使用a更慢(特别是对于字符串!)HashMultiset
.
要获得前N个,您只需要将该局部的前N写入HashMultiset
输出收集器,并以正常方式在reduce侧汇总结果.这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对字数统计元组进行排序.
代码的一部分可能如下所示:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
Run Code Online (Sandbox Code Playgroud)
希望你能得到在本地做同样多的话的要点,然后只收集前N个的前N个;)
引用托马斯
要获得前N个,您只需将该本地HashMultiset中的前N个写入输出收集器,并以正常方式在reduce侧汇总结果.这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对字数统计元组进行排序.
如果你只在本地HashMultiset中写入前N个,那么你可能会错过一个元素的数量,如果从这个本地HashMultiset传递,它可能会成为整个前10个元素之一.
例如,将以下格式视为三个映射为MapName:elementName,elemenntcount:
地图A:Ele1,4:Ele2,5:Ele3,5:Ele4,2
地图B:Ele1,1:Ele5,7:Ele6,3:Ele7,6
地图C:Ele5,4:Ele8,3:Ele1,1:Ele9,3
现在,如果我们考虑每个映射器的前3个,我们将错过元素"Ele1",其总数应该是6但是由于我们计算每个映射器的前3个,我们看到"Ele1"的总计数为4.
我希望这是有道理的.请让我知道你对它的看法.