bgc*_*ode 8 python hadoop mapreduce amazon-web-services elastic-map-reduce
当我这样做时,一切都在当地正常工作:
cat input | python mapper.py | sort | python reducer.py
Run Code Online (Sandbox Code Playgroud)
但是,当我在AWS Elastic Mapreduce上运行流式MapReduce作业时,作业无法成功完成.在mapper.py通过运行部分方式(我知道,因为写的这stderr沿途).映射器被"Broken Pipe"错误中断,我可以在失败后从任务尝试的syslog中检索到该错误:
java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.io.IOException: log:null
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=hadoop
HADOOP_USER=null
last Hadoop input: |null|
last tool output: |text/html 1|
Date: Mon Mar 26 07:19:05 UTC 2012
java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written
Run Code Online (Sandbox Code Playgroud)
这是mapper.py.请注意,我写信给stderr为自己提供调试信息:
#!/usr/bin/env python
import sys
from warc import ARCFile
def main():
warc_file = ARCFile(fileobj=sys.stdin)
for web_page in warc_file:
print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging
print '%s\t%s' % (web_page.header.content_type, 1)
print >> sys.stderr, 'done' #For debugging
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
以下是我在stderr中运行mapper.py时的任务尝试:
text/html 1
text/html 1
text/html 1
Run Code Online (Sandbox Code Playgroud)
基本上,循环运行3次然后突然停止而没有python抛出任何错误.(注意:它应该输出数千行).即使是未被捕获的例外也应该出现在stderr中.
因为MapReduce在我的本地计算机上运行完全正常,我的猜测是Hadoop如何处理我从mapper.py打印的输出这是一个问题.但我对问题可能是什么一无所知.
gha*_*yes 10
您的流式处理(您的Python脚本)过早终止.这可能是因为它认为输入是完整的(例如解释EOF)或吞下的异常.无论哪种方式,Hadoop都试图通过STDIN输入到您的脚本,但由于应用程序已终止(因此STDIN不再是有效的文件描述符),您将收到BrokenPipe错误.我建议在脚本中添加stderr跟踪,以查看导致问题的输入行.快乐的编码,
-Geoff
如上所述,但让我试着澄清一下 - 你必须阻止stdin,即使你不需要它!这与Linux管道不一样,所以不要让它欺骗你.直观地说,Streaming站起来执行你的可执行文件,然后说,"在我为你输入信息的同时在这里等待".如果你的可执行文件因任何原因停止流发送给您的输入的100%之前,流媒体说,"你在哪该可执行文件去,我站了起来?...... Hmmmm ...管道损坏,让我提出这个异常!" 所以,这里有一些python代码,它所做的就是cat所做的,但你会注意到,在处理完所有输入之前,这段代码不会退出,这是关键点:
#!/usr/bin/python
import sys
while True:
s = sys.stdin.readline()
if not s:
break
sys.stdout.write(s)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8500 次 |
| 最近记录: |