Hadoop流 - 从reducer输出中删除尾随选项卡

Edd*_*ied 9 hadoop hadoop-streaming

我有一个hadoop流作业,其输出不包含键/值对.您可以将其视为仅限值对或仅键对.

我的流式缩减器(PHP脚本)输出由换行符分隔的记录.Hadoop流将其视为没有值的密钥,并在换行符之前插入一个选项卡.这个额外的标签是不需要的

我该如何删除它?

我正在使用带有AWS EMR的hadoop 1.0.3.我下载了hadoop 1.0.3的源代码,并在hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java中找到了这段代码:

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
Run Code Online (Sandbox Code Playgroud)

所以我试着把工作-D stream.reduce.output.field.separator=作为一个参数传递给没有运气的工作.我也尝试过-D mapred.textoutputformat.separator=-D mapreduce.output.textoutputformat.separator=没有运气.

我当然搜索谷歌,我找不到任何工作.一个搜索结果甚至表明,没有任何论据可以传递以达到预期的结果(但是,在这种情况下,hadoop版本真的很老).

这是我的代码(为了便于阅读,添加了换行符):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'
Run Code Online (Sandbox Code Playgroud)

Mat*_* S. 10

对其他人有帮助,使用上面的提示,我能够做一个实现:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}
Run Code Online (Sandbox Code Playgroud)

只需将'getRecordWriter'的内置实现中的一行更改为:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 
Run Code Online (Sandbox Code Playgroud)

代替:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 
Run Code Online (Sandbox Code Playgroud)

在将其编译成Jar并将其包含在我的hadoop流式呼叫中(通过hadoop流式传输的指令)之后,调用看起来像:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS
Run Code Online (Sandbox Code Playgroud)

我还将jar包含在我发出的调用文件夹中.

它对我的需求非常有效(并且它在减速器之后的行末没有创建标签).


更新:根据评论暗示这确实对其他人有用,这里是我的CustomOutputFormat.java文件的完整源代码:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}
Run Code Online (Sandbox Code Playgroud)

仅供参考:对于您的使用环境,请务必检查这不会对您的映射器和reducer之间的hadoop-streaming管理交互(在分离键与值之间)产生负面影响.澄清:

  • 根据我的测试 - 如果你的数据的每一行都有一个'tab'(每侧有一些东西),你可以保留内置的默认值:流将在第一个标签之前解释第一个内容你的'钥匙',以及它之后的那一行作为你的'价值'.因此,它没有看到"空值",并且不会附加在缩减器后显示的选项卡.(您将看到您的最终输出按照"键"的值排序,该值在每行中解释为在每个选项卡之前发生的内容.)

  • 相反,如果您的数据中没有选项卡,并且您没有使用上述技巧覆盖默认值,那么您将在运行完成后看到选项卡,上面的覆盖将成为修复.


lib*_*ack 7

查看org.apache.hadoop.mapreduce.lib.output.TextOutputFormat源代码,我看到两件事:

  1. write(key,value)如果键或值为非null,则该方法会写入分隔符
  2. \tmapred.textoutputformat.separator返回null(我假设发生了)时,始终使用default()设置分隔符-D stream.reduce.output.field.separator=

您唯一的解决方案可能是编写自己的OutputFormat,解决这两个问题.

我的测试

在我的任务中,我想重新格式化一行

id1|val1|val2|val3
id1|val1
Run Code Online (Sandbox Code Playgroud)

成:

id1|val1,val2,val3
id2|val1
Run Code Online (Sandbox Code Playgroud)

我有一个自定义映射器(Perl脚本)来转换行.对于此任务,我最初尝试将其作为仅键输入(或仅限值)输入,但使用尾随选项卡获得结果.

起初我刚刚指定:

-D stream.map.input.field.separator ='|' -D stream.map.output.field.separator ='|'

这给了映射器一个键值对,因为我的映射无论如何都需要一个键.但是这个输出现在在第一个字段之后有了标签

当我添加时,我得到了所需的输出:

-D mapred.textoutputformat.separator ='|'

如果我没有设置它或设置为空白

-D mapred.textoutputformat.separator =

然后我会在第一个字段后再次获得一个标签.

一看到TextOutputFormat的源代码就有意义了

  • 是的,但我认为你最终会得到'|' 而不是结果中的'\ t',而是密钥和数据之间的时间.对? (2认同)