在MapReduce中,如何在处理X记录后停止减速器

0 hadoop mapreduce reducers mapper

我正在使用Mapper加载大量数据,这些数据具有执行时间和与之关联的大型查询.我只需要找到1000个最昂贵的查询,所以我将执行时间作为我输出的关键字输入映射器.我使用1个reducer,只想写1000条记录,减速机停止处理.

如果(count <1000){context.write(key,value)},我可以有一个全局计数器并执行此操作

但这仍将加载所有数十亿条记录,然后不再写入.

我希望在吐出1000条记录后停止减速机.通过避免下一组记录的搜索时间和读取时间.

这可能吗??

Chr*_*ite 7

您可以通过覆盖方法的默认实现来完全缩短reducer Reducer.run():

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  while (context.nextKey()) {
    reduce(context.getCurrentKey(), context.getValues(), context);
  }
  cleanup(context);
}
Run Code Online (Sandbox Code Playgroud)

您应该能够修改while循环以包括您的计数器,如下所示:

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  int count = 0;
  while (context.nextKey() && count++ < 1000) {
    reduce(context.getCurrentKey(), context.getValues(), context);
  }
  cleanup(context);
}
Run Code Online (Sandbox Code Playgroud)

并不是说这不一定会输出最前面的记录,而只是输出前1000个键控记录(如果你的reduce实现输出的数据超过单个记录则不会起作用 - 在这种情况下你可以在reduce方法中增加计数器)