为什么MapReduce需要Reduce?如果计算书中单词数量这样的作业如果由单个进程执行或者在服务器场上进行MapReduced会导致相同的结果,那么是否有可能必须删除重复项?我假设Reduce步骤,至少在这个例子中,只是简化每个工作进程的结果并提供书中单词的总数.我不明白图片中有什么重复的东西.
美好的一天......我有点困惑; 减少任务和减少工作之间有什么区别?这是我的情况; 我已经读过,在所有映射完成之前,reduce不会启动...但是在hadoop输出中我看到了另外一个:
12/02/11 10:58:50 INFO mapred.JobClient: map 60% reduce 16%
12/02/11 10:58:54 INFO mapred.JobClient: map 60% reduce 20%
12/02/11 10:58:55 INFO mapred.JobClient: map 65% reduce 20%
Run Code Online (Sandbox Code Playgroud)
减少是16%,而地图仍然是60%......这里真的发生了什么?
我正在学习在hadoop集群上工作.我已经在hadoop流上工作了一段时间,我在perl/python中编写了map-reduce脚本并运行了这个工作.但是,我没有找到任何关于运行java map reduce作业的好解释.例如:我有以下程序 -
http://www.infosci.cornell.edu/hadoop/wordcount.html
有人可以告诉我如何编译这个程序并运行这个工作.
我想要一个小的文件传递给我使用的运行作业GenericOptionsParser的-files标志:
$ hadoop jar MyJob.jar -conf /path/to/cluster-conf.xml -files /path/to/local-file.csv data/input data/output
Run Code Online (Sandbox Code Playgroud)
这应该将作业发送到我的集群并附加local-file.csv以在需要时可用于Mapper/Reducer.当我在伪分布式模式下运行它时,它工作得很好,但是当我在集群上启动作业时,似乎无法找到该文件.我正在使用mapper的setup方法读取文件,如下所示:
public static class TheMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void setup(Context context) throws IOException, InterruptedException {
URI[] uriList = DistributedCache.getCacheFiles( context.getConfiguration() );
CsvReader csv = new CsvReader(uriList[0].getPath());
// work with csv file..
}
// ..
}
Run Code Online (Sandbox Code Playgroud)
当作业运行时,我得到以下异常:
java.io.FileNotFoundException: File /hdfs/tmp/mapred/staging/hduser/.staging/job_201205112311_011/files/local-file.csv does not exist.
at com.csvreader.CsvReader.<init>(Unknown Source)
at com.csvreader.CsvReader.<init>(Unknown Source)
at com.csvreader.CsvReader.<init>(Unknown Source)
at MyJob$TheMapper.setup(MyJob.java:167)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
...
Run Code Online (Sandbox Code Playgroud)
知道我做错了什么吗?谢谢.
我注意到,通常当hadoop集群不忙时,在map side完成之前,减少一边开始进行?怎么可能?我记得在某个地方阅读减少进度指示器有点误导但不记得确切.有人可以对此有所了解吗?
将我们的应用程序转换为Python 2.7,配置多线程,并在app.yaml中引用mapreduce,就像这样......
- url: /mapreduce(/.*)?
script: mapreduce.main.app
#script: google.appengine.ext.mapreduce.main.app
login: admin
Run Code Online (Sandbox Code Playgroud)
并像这样调用mapreduce ...
control.start_map(
"FNFR",
"fnfr.fnfrHandler",
"mapreduce.input_readers.BlobstoreLineInputReader",
{"blob_keys": blobKey},
shard_count=32,
mapreduce_parameters={'done_callback': '/fnfrdone','blobKey': blobKey, 'userID':thisUserID})
Run Code Online (Sandbox Code Playgroud)
我们得到以下堆栈跟踪...
Traceback (most recent call last):
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/runtime/wsgi.py", line 189, in Handle
handler = _config_handle.add_wsgi_middleware(self._LoadHandler())
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/runtime/wsgi.py", line 241, in _LoadHandler
raise ImportError('%s has no attribute %s' % (handler, name))
ImportError: <module 'mapreduce.main' from '/base/data/home/apps/s~xxxxxxonline/2.361692533819432574/mapreduce/main.pyc'> has no attribute app
Run Code Online (Sandbox Code Playgroud)
我发现了一个SO参考(如何将我的app.yaml迁移到2.7?)但是从我的yaml可以看出,我想我已经尝试了所有组合来尝试解决它.谢谢.
我是Hadoop和Java的新手,我觉得有一些明显的东西我只是缺少了.如果这意味着什么,我正在使用Hadoop 1.0.3.
我使用hadoop的目的是获取一堆文件并一次解析一个文件(而不是逐行).每个文件将生成多个键值,但其他行的上下文很重要.键和值是多值/复合的,所以我已经为键实现了WritableCompare,为值实现了可写.因为每个文件的处理占用了一点CPU,我想保存映射器的输出,然后再运行多个reducer.
对于复合键,我跟着[http://stackoverflow.com/questions/12427090/hadoop-composite-key][1]
问题是,输出只是Java对象引用而不是复合键和值.例:
LinkKeyWritable@bd2f9730 LinkValueWritable@8752408c
我不确定问题是否与根本没有减少数据有关
这是我的主要课程:
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(Parser.class);
conf.setJobName("raw_parser");
conf.setOutputKeyClass(LinkKeyWritable.class);
conf.setOutputValueClass(LinkValueWritable.class);
conf.setMapperClass(RawMap.class);
conf.setNumMapTasks(0);
conf.setInputFormat(PerFileInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
PerFileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
Run Code Online (Sandbox Code Playgroud)
而我的Mapper类:
公共类RawMap扩展MapReduceBase实现Mapper {
public void map(NullWritable key, Text value,
OutputCollector<LinkKeyWritable, LinkValueWritable> output,
Reporter reporter) throws IOException {
String json = value.toString();
SerpyReader reader = new SerpyReader(json);
GoogleParser parser = new GoogleParser(reader);
for (String page : reader.getPages()) {
String content = reader.readPageContent(page);
parser.addPage(content);
} …Run Code Online (Sandbox Code Playgroud)
如何使用hadoop mapreduce程序
在单个文件中消除重复值在输出中我需要唯一的值
例如:在文件
行1:嗨这是Ashok
第2
行:hadoop框架第3行的基础知识:这是Ashok
来自这个例子需要输出唯一的值,即它应该打印第1行和第3行...如何做...
我已经流式传输并将大约25万条推文保存到MongoDB中,在这里,我正在检索它,正如您所看到的,基于推文中出现的单词或关键字.
Mongo mongo = new Mongo("localhost", 27017);
DB db = mongo.getDB("TwitterData");
DBCollection collection = db.getCollection("publicTweets");
BasicDBObject fields = new BasicDBObject().append("tweet", 1).append("_id", 0);
BasicDBObject query = new BasicDBObject("tweet", new BasicDBObject("$regex", "autobiography"));
DBCursor cur=collection.find(query,fields);
Run Code Online (Sandbox Code Playgroud)
我想要做的是使用Map-Reduce并根据关键字对其进行分类并将其传递给reduce函数来计算每个类别下的推文数量,有点像你在这里看到的.在这个例子中,他正在计算页数,因为它是一个简单的数字.我想做的事情如下:
"if (this.tweet.contains("kword1")) "+
"category = 'kword1 tweets'; " +
"else if (this.tweet.contains("kword2")) " +
"category = 'kword2 tweets';
Run Code Online (Sandbox Code Playgroud)
然后使用reduce函数来获取计数,就像在示例程序中一样.
我知道语法不正确,但这就是我想做的事情.有没有办法实现它?谢谢!
PS:哦,我用Java编写代码.因此,Java语法将受到高度赞赏.谢谢!
发布的代码输出如下:
{ "tweet" : "An autobiography is a book that reveals nothing bad about its writer except his memory."}
{ "tweet" : "I refuse …Run Code Online (Sandbox Code Playgroud) 我们可以在同一个hadoop集群中同时使用Fair调度程序和Capacity Scheduler.哪种调度程序是好的和有效的.谁能帮我 ?