我需要从我的reducer中的mapper访问计数器.这可能吗?如果是这样怎么办?
举个例子:我的映射器是:
public class CounterMapper extends Mapper<Text,Text,Text,Text> {
static enum TestCounters { TEST }
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(TestCounters.TEST).increment(1);
context.write(key, value);
}
}
Run Code Online (Sandbox Code Playgroud)
我的减速机是
public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
long counterValue = counter.getValue();
context.write(key, new LongWritable(counterValue));
}
}
Run Code Online (Sandbox Code Playgroud)
counterValue总是0.我做错了什么或这是不可能的?
Jef*_*f G 11
在Reducer的configure(JobConf)中,您可以使用JobConf对象来查找reducer自己的作业ID.有了它,您的reducer可以创建自己的JobClient - 即与jobtracker的连接 - 并查询此作业的计数器(或任何相关的工作).
// in the Reducer class...
private long mapperCounter;
@Override
public void configure(JobConf conf) {
JobClient client = new JobClient(conf);
RunningJob parentJob =
client.getJob(JobID.forName( conf.get("mapred.job.id") ));
mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
}
Run Code Online (Sandbox Code Playgroud)
现在您可以在reduce()方法本身中使用mapperCounter.
你真的需要一个try-catch.我正在使用旧的API,但它不应该很难适应新的API.
请注意,映射器的计数器应该在任何减速器开始之前完成,所以与Justin Thomas的评论相反,我相信你应该得到准确的值(只要减速器没有递增相同的计数器!)
在新API上实施了Jeff G的解决方案:
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();
}
Run Code Online (Sandbox Code Playgroud)
Map/Reduce 的重点是并行化作业。将会有许多独特的映射器/化简器,因此除了运行映射/化简对之外,该值无论如何都不会正确。
他们有一个字数统计示例:
http://wiki.apache.org/hadoop/WordCount
您可以将 context.write(word,one) 更改为 context.write(line,one)