Geo*_*rge 5 java distributed hadoop mapreduce
我是Hadoop的新手,我只是运行wordCount示例:http://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html
假设我们有一个包含3个文件的文件夹.我希望每个文件都有一个映射器,这个映射器只计算行数并将其返回到reducer.
然后,reducer将输入每个映射器的行数作为输入,并将所有3个文件中存在的总行数作为输出.
所以,如果我们有以下3个文件
input1.txt
input2.txt
input3.txt
Run Code Online (Sandbox Code Playgroud)
并且映射器返回:
mapper1 -> [input1.txt, 3]
mapper2 -> [input2.txt, 4]
mapper3 -> [input3.txt, 9]
Run Code Online (Sandbox Code Playgroud)
减速器将输出
3+4+9 = 16
Run Code Online (Sandbox Code Playgroud)
我在一个简单的java应用程序中完成了这个,所以我想在Hadoop中完成它.我只有一台计算机,并希望尝试在伪分布式环境中运行.
我怎样才能实现这个目标?我应该采取什么适当的措施?
我的代码应该在apache的示例中看起来像那样吗?我将有两个静态类,一个用于mapper,一个用于reducer?或者我应该有3个类,每个映射器一个?
如果你能指导我完成这个,我不知道如何做到这一点,我相信如果我设法编写一些代码来做这些东西,那么我将来能够编写更复杂的应用程序.
谢谢!
Chr*_*ite 11
除了sa125的回答,您可以通过巨大的发光不能为每个输入记录的记录提高性能,而只是堆积在映射一个计数器,然后在映射器清理法,发出文件名和计数值:
public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
protected long lines = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
String filename = split.getPath().toString();
context.write(new Text(filename), new LongWritable(lines));
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
lines++;
}
}
Run Code Online (Sandbox Code Playgroud)
我注意到你使用的是0.18版本的文档.这是1.0.2(最新)的链接.
第一个建议 - 使用IDE(eclipse,IDEA等).填补空白真的很有帮助.
在实际的HDFS中,您无法知道文件的每个部分所在的位置(不同的计算机和群集).没有任何保证行X甚至与行Y驻留在同一磁盘上.也不能保证行X不会在不同的机器上分割(HDFS以块的形式分配数据,通常每块64Mb).这意味着您不能假设相同的映射器将处理整个文件.您可以确保每个文件都由同一个reducer处理.
由于reducer对于映射器发送的每个键都是唯一的,所以我这样做的方法是使用文件名作为映射器中的输出键.此外,映射器的默认输入类是TextInputFormat,这意味着每个映射器将自己接收整行(由LF或CR终止).然后,您可以从映射器中发出文件名和数字1(或者其他与计算无关的内容).然后,在reducer中,您只需使用一个循环来计算接收文件名的次数:
public static class Map extends Mapper<IntWritable, Text, Text, Text> {
public void map(IntWritable key, Text value, Context context) {
// get the filename
InputSplit split = context.getInputSplit();
String fileName = split.getPath().getName();
// send the filename to the reducer, the value
// has no meaning (I just put "1" to have something)
context.write( new Text(fileName), new Text("1") );
}
}
Run Code Online (Sandbox Code Playgroud)
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text fileName, Iterator<Text> values, Context context) {
long rowcount = 0;
// values get one entry for each row, so the actual value doesn't matter
// (you can also get the size, I'm just lazy here)
for (Text val : values) {
rowCount += 1;
}
// fileName is the Text key received (no need to create a new object)
context.write( fileName, new Text( String.valueOf( rowCount ) ) );
}
}
Run Code Online (Sandbox Code Playgroud)
您几乎可以使用与wordcount示例相同的驱动程序 - 请注意,我使用了新的mapreduce API,因此您需要调整一些内容(Job而不是JobConf等).当我读到它时,这真的很有帮助.
请注意,您的MR输出将只是每个文件名及其行数:
input1.txt 3
input2.txt 4
input3.txt 9
Run Code Online (Sandbox Code Playgroud)
如果您只想计算所有文件中的TOTAL行数,只需在所有映射器中发出相同的键(而不是文件名).这样,只有一个reducer可以处理所有行计数:
// no need for filename
context.write( new Text("blah"), new Text("1") );
Run Code Online (Sandbox Code Playgroud)
您还可以链接一个工作,该工作将处理每个文件行数的输出,或者做其他奇特的工作 - 这取决于您.
我留下了一些样板代码,但基础知识就在那里.一定要检查我,因为我从记忆中输入了大部分内容.. :)
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
8231 次 |
| 最近记录: |