我阅读了Hadoop in Action,发现在Java使用MultipleOutputFormat和MultipleOutputs类中我们可以将数据减少到多个文件,但我不确定如何使用相同的东西Python streaming.
例如:
/ out1/part-0000
mapper -> reducer
\ out2/part-0000
Run Code Online (Sandbox Code Playgroud)
如果有人知道,听到,做过类似的事情,请告诉我
我有一个文件包含一个字符串,然后是一个空格,然后每行都有一个数字.
例:
Line1: Word 2
Line2 : Word1 8
Line3: Word2 1
Run Code Online (Sandbox Code Playgroud)
我需要按降序对数字进行排序,然后将结果放入一个文件中,为数字指定一个等级.所以我的输出应该是一个包含以下格式的文件:
Line1: Word1 8 1
Line2: Word 2 2
Line3: Word2 1 3
Run Code Online (Sandbox Code Playgroud)
有没有人有想法,我怎么能在Hadoop中做到这一点?我正在使用Java与Hadoop.
我正在使用"hadoop-0.20.203.0rc1.tar.gz"进行群集设置.每当我设定job.setMapOutputKeyClass(ByteBuffer.class);
并运行我得到以下异常的工作:
12/01/13 15:09:00 INFO mapred.JobClient: Task Id : attempt_201201131428_0005_m_000001_2, Status : FAILED
java.lang.ClassCastException: class java.nio.ByteBuffer
at java.lang.Class.asSubclass(Class.java:3018)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:776)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:958)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:755)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Run Code Online (Sandbox Code Playgroud)
另外我注意到ByteBuffer是Comparable而不是Writable是否会产生任何差异?如果需要任何其他信息,请与我们联系.
我正在尝试启动我的hadoop应用程序,但是在启动时我在日志文件中看到这个,有没有人知道问题是什么?
在org.apache.hadoop.conf.Configuration的org.apache.hadoop.conf.Configuration.(Configuration.java:229)中为hdfs://10.170.4.141:9000 java.io.IOException:config()创建文件系统. (config.java:216)org.apache.hadoop.security.SecurityUtil.(SecurityUtil.java:60)org.apache.hadoop.net.NetUtils.makeSocketAddr(NetUtils.java:188)org.apache.hadoop .net.NetUtils.createSocketAddr(NetUtils.java:168)org.apache.hadoop.hdfs.server.namenode.NameNode.getAddress(NameNode.java:198)org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem) .java:88)org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1413)org.apache.hadoop.fs.FileSystem.access $ 200(FileSystem.java:68)org.apache.hadoop .fs.FileSystem $ Cache.get(FileSystem.java:1431)org.apache.hadoop.fs.FileSystem.get(FileSystem.java:256)org.apache.hadoop.fs.FileSystem.get(FileSystem.java) :125)org.apache.hadoop.fs.FileSystem.get(FileSystem.java:240)at org.apache.hadoop.fs.Path.位于org.blismedia.VolumeReportGenerateUpdates.main(VolumeReportGenerateUpdates.java:156)org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(FileInputFormat.java:372)的getFileSystem(Path.java:187)at sun.reflect at.MativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)at java.lang.reflect.Method.invoke(Method .java:597)org.apache.hadoop.util.RunJar.main(RunJar.java:187)
在尝试使用MongoDB作为输入RDD时,我在org.bson.BasicBSONDecoder._decode中得到"java.lang.IllegalStateException:not ready":
Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");
JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);
System.out.println(rdd.count());
Run Code Online (Sandbox Code Playgroud)
我得到的例外是:14/08/06 09:49:57 INFO rdd.NewHadoopRDD:输入拆分:
MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: not ready
at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:618)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089) …Run Code Online (Sandbox Code Playgroud) 我在Python中创建了一个简单的map reduce,只是为了测试os.environ['mapreduce_map_input_file']调用,如下所示:
map.py
#!/usr/bin/python
import sys
# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:
l = line.strip().split()
for word in l:
# output goes to STDOUT (stream data that the program writes)
print "%s\t%d" %( word, 1 )
Run Code Online (Sandbox Code Playgroud)
reduce.py
#!/usr/bin/python
import sys
import os
current_word = None
current_sum = 0
# input comes from STDIN (stream data that goes to the program)
for line in sys.stdin:
word, count = line.strip().split("\t", …Run Code Online (Sandbox Code Playgroud) 我有成千上万的小号s3小文件,我希望将它们合并在一起。我使用了s3distcp语法,但是发现合并文件后,合并集中不包含换行符。
我想知道s3distcp是否包含任何强制插入换行符的选项,或者是否存在另一种方法来完成此操作而无需直接修改源文件(或复制并执行相同操作)
我正在尝试使用mapreduce运行非常简单的任务。
mapper.py:
#!/usr/bin/env python
import sys
for line in sys.stdin:
print line
Run Code Online (Sandbox Code Playgroud)
我的txt文件:
qwerty
asdfgh
zxc
Run Code Online (Sandbox Code Playgroud)
命令行运行作业:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.8.0.jar \
-input /user/cloudera/In/test.txt \
-output /user/cloudera/test \
-mapper /home/cloudera/Documents/map.py \
-file /home/cloudera/Documents/map.py
Run Code Online (Sandbox Code Playgroud)
错误:
INFO mapreduce.Job: Task Id : attempt_1490617885665_0008_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Run Code Online (Sandbox Code Playgroud)
如何解决此问题并运行代码?当我使用cat /home/cloudera/Documents/test.txt | python …
我在AWS datapipeline中设置了一个emr步骤.step命令如下所示:
/usr/lib/hadoop-mapreduce/hadoop-streaming.jar,-input,s3n://input-bucket/input-file,-output,s3://output/output-dir,-mapper,/bin/cat,-reducer,reducer.py,-file,/scripts/reducer.py,-file,/params/parameters.bin
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:467)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:393)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Container killed by the ApplicationMaster.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:467)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:393)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native …Run Code Online (Sandbox Code Playgroud) 我试图在Hadoop环境中执行NLTK.以下是我用于执行的命令.
斌/ Hadoop的罐子$ HADOOP_HOME /的contrib /流/ Hadoop的流-1.0.4.jar - 输入/用户/ NLTK /输入/ - 输出/用户/ NLTK /输出1/-file /家庭/ hduser /软件/ NLTK/unsupervised_sentiment -master.zip -mapper /home/hduser/softwares/NLTK/unsupervised_sentiment-master/sentiment.py
unsupervised_sentiment-master.zip ---包含sentiment.py所需的所有相关文件
我正进入(状态
了java.lang.RuntimeException:PipeMapRed.waitOutputThreads():在子进程在org.apache.hadoop.streaming.PipeMapRed.mapRedFinished org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)与代码2失败(PipeMapRed的.java:576)在org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)在org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)在org.apache.hadoop. streaming.PipeMapRunner.run(PipeMapRunner.java:36)在org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)在org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)在org.apache.hadoop.mapred.Child $ 4.run(Child.java:255)在java.security.AccessController.doPrivileged(本机方法)在javax.security.auth.Subject.doAs(Subject.java:415)在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)org.apache.hadoop.mapred.Child.main(Child.java:249)
任何帮助将不胜感激!!!
hadoop-streaming ×10
hadoop ×9
mapreduce ×4
java ×2
python ×2
amazon-emr ×1
amazon-s3 ×1
apache-spark ×1
bytebuffer ×1
cloudera ×1
emr ×1
exception ×1
mongodb ×1
mxnet ×1
nltk ×1