从reducer访问映射器的计数器

asd*_*sdf 12 java hadoop

我需要从我的reducer中的mapper访问计数器.这可能吗?如果是这样怎么办?

举个例子:我的映射器是:

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    @Override
    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.getCounter(TestCounters.TEST).increment(1);
        context.write(key, value);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的减速机是

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));
    }
}
Run Code Online (Sandbox Code Playgroud)

counterValue总是0.我做错了什么或这是不可能的?

Jef*_*f G 11

在Reducer的configure(JobConf)中,您可以使用JobConf对象来查找reducer自己的作业ID.有了它,您的reducer可以创建自己的JobClient - 即与jobtracker的连接 - 并查询此作业的计数器(或任何相关的工作).

// in the Reducer class...
private long mapperCounter;

@Override
public void configure(JobConf conf) {
    JobClient client = new JobClient(conf);
    RunningJob parentJob = 
        client.getJob(JobID.forName( conf.get("mapred.job.id") ));
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}
Run Code Online (Sandbox Code Playgroud)

现在您可以在reduce()方法本身中使用mapperCounter.

你真的需要一个try-catch.我正在使用旧的API,但它不应该很难适应新的API.

请注意,映射器的计数器应该在任何减速器开始之前完成,所以与Justin Thomas的评论相反,我相信你应该得到准确的值(只要减速器没有递增相同的计数器!)

  • @abhinavkulkarni实际上,**只有**减速器的shuffle阶段可以在所有映射器启动之前启动,这与计数器无关.因此,当减速器的减速阶段开始时,所有映射器计数器都是正确的.来自同一篇文章:"另一方面,排序和减少只能在所有映射器完成后才能启动." (2认同)

itz*_*aki 8

在新API上实施了Jeff G的解决方案:

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }
Run Code Online (Sandbox Code Playgroud)

  • 我尝试了这个但是我在以下行mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME)中收到了一个java null point异常错误,我在那里用我的自定义计数器替换了COUNTER_NAME (2认同)

Jus*_*mas 2

Map/Reduce 的重点是并行化作业。将会有许多独特的映射器/化简器,因此除了运行映射/化简对之外,该值无论如何都不会正确。

他们有一个字数统计示例:

http://wiki.apache.org/hadoop/WordCount

您可以将 context.write(word,one) 更改为 context.write(line,one)