Edd*_*ied 9 hadoop hadoop-streaming
我有一个hadoop流作业,其输出不包含键/值对.您可以将其视为仅限值对或仅键对.
我的流式缩减器(PHP脚本)输出由换行符分隔的记录.Hadoop流将其视为没有值的密钥,并在换行符之前插入一个选项卡.这个额外的标签是不需要的
我该如何删除它?
我正在使用带有AWS EMR的hadoop 1.0.3.我下载了hadoop 1.0.3的源代码,并在hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java中找到了这段代码:
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
Run Code Online (Sandbox Code Playgroud)
所以我试着把工作-D stream.reduce.output.field.separator=作为一个参数传递给没有运气的工作.我也尝试过-D mapred.textoutputformat.separator=和-D mapreduce.output.textoutputformat.separator=没有运气.
我当然搜索谷歌,我找不到任何工作.一个搜索结果甚至表明,没有任何论据可以传递以达到预期的结果(但是,在这种情况下,hadoop版本真的很老).
这是我的代码(为了便于阅读,添加了换行符):
hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
-D mapred.output.compress=true -D stream.reduce.output.field.separator=
-input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
-mapper 'php my_mapper.php' -reducer 'php my_reducer.php'
Run Code Online (Sandbox Code Playgroud)
Mat*_* S. 10
对其他人有帮助,使用上面的提示,我能够做一个实现:
CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}
Run Code Online (Sandbox Code Playgroud)
只需将'getRecordWriter'的内置实现中的一行更改为:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "");
Run Code Online (Sandbox Code Playgroud)
代替:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
Run Code Online (Sandbox Code Playgroud)
在将其编译成Jar并将其包含在我的hadoop流式呼叫中(通过hadoop流式传输的指令)之后,调用看起来像:
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat \
-file yourMapper.py -mapper yourMapper.py \
-file yourReducer.py -reducer yourReducer.py \
-input $yourInputFile \
-output $yourOutputDirectoryOnHDFS
Run Code Online (Sandbox Code Playgroud)
我还将jar包含在我发出的调用文件夹中.
它对我的需求非常有效(并且它在减速器之后的行末没有创建标签).
更新:根据评论暗示这确实对其他人有用,这里是我的CustomOutputFormat.java文件的完整源代码:
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress) throws IOException {
boolean isCompressed = getCompressOutput(job);
//Channging the default from '\t' to blank
String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator);
}
}
}
Run Code Online (Sandbox Code Playgroud)
仅供参考:对于您的使用环境,请务必检查这不会对您的映射器和reducer之间的hadoop-streaming管理交互(在分离键与值之间)产生负面影响.澄清:
根据我的测试 - 如果你的数据的每一行都有一个'tab'(每侧有一些东西),你可以保留内置的默认值:流将在第一个标签之前解释第一个内容你的'钥匙',以及它之后的那一行作为你的'价值'.因此,它没有看到"空值",并且不会附加在缩减器后显示的选项卡.(您将看到您的最终输出按照"键"的值排序,该值在每行中解释为在每个选项卡之前发生的内容.)
相反,如果您的数据中没有选项卡,并且您没有使用上述技巧覆盖默认值,那么您将在运行完成后看到选项卡,上面的覆盖将成为修复.
查看org.apache.hadoop.mapreduce.lib.output.TextOutputFormat源代码,我看到两件事:
write(key,value)如果键或值为非null,则该方法会写入分隔符\t当mapred.textoutputformat.separator返回null(我假设发生了)时,始终使用default()设置分隔符-D stream.reduce.output.field.separator=您唯一的解决方案可能是编写自己的OutputFormat,解决这两个问题.
在我的任务中,我想重新格式化一行
id1|val1|val2|val3
id1|val1
Run Code Online (Sandbox Code Playgroud)
成:
id1|val1,val2,val3
id2|val1
Run Code Online (Sandbox Code Playgroud)
我有一个自定义映射器(Perl脚本)来转换行.对于此任务,我最初尝试将其作为仅键输入(或仅限值)输入,但使用尾随选项卡获得结果.
起初我刚刚指定:
-D stream.map.input.field.separator ='|' -D stream.map.output.field.separator ='|'
这给了映射器一个键值对,因为我的映射无论如何都需要一个键.但是这个输出现在在第一个字段之后有了标签
当我添加时,我得到了所需的输出:
-D mapred.textoutputformat.separator ='|'
如果我没有设置它或设置为空白
-D mapred.textoutputformat.separator =
然后我会在第一个字段后再次获得一个标签.
一看到TextOutputFormat的源代码就有意义了
| 归档时间: |
|
| 查看次数: |
6749 次 |
| 最近记录: |