创建像TextLine一样的Scalding Source,它将多个文件组合成单个映射器

sam*_*est 4 hadoop scala cascading scalding

我们有许多需要组合的小文件.在Scalding中,您可以使用TextLine文本行来读取文件.问题是我们每个文件得到1个映射器,但我们想要组合多个文件,以便它们由1个映射器处理.

我知道我们需要将输入格式更改为实现CombineFileInputFormat,这可能涉及使用cascadings CombinedHfs.我们无法弄清楚如何做到这一点,但它应该只是少数几行代码来定义我们自己的Scalding源,比如说CombineTextLine.

非常感谢能够提供代码的任何人.

作为一个方面的问题,我们有一些数据,在S3中,如果解决方案给出了S3文件作品也将是巨大的-我想这取决于是否CombineFileInputFormatCombinedHfs适用于S3.

Iva*_*lov 9

你在问题中得到了想法,所以这里有可能是你的解决方案.

创建自己的输入格式,扩展CombineFileInputFormat并使用您自己的自定义RecordReader.我正在向您展示Java代码,但如果您愿意,可以轻松地将其转换为Scala.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {

    public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
        private final RecordReader<LongWritable,Text> delegate;

        public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
            FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
            delegate = new LineRecordReader(conf, fileSplit);
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {
            return delegate.next(key, value);
        }

        @Override
        public LongWritable createKey() {
            return delegate.createKey();
        }

        @Override
        public Text createValue() {
            return delegate.createValue();
        }

        @Override
        public long getPos() throws IOException {
            return delegate.getPos();
        }

        @Override
        public void close() throws IOException {
            delegate.close();
        }

        @Override
        public float getProgress() throws IOException {
            return delegate.getProgress();
        }
    }

    @Override
    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
    }

}
Run Code Online (Sandbox Code Playgroud)

然后,您需要扩展TextLine类并使其使用您刚刚定义的自己的输入格式(从现在开始的Scala代码).

import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource, TextLineScheme}
import cascading.scheme.Scheme

class CombineFileTextLine extends TextLine{

  override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
    super.sourceConfInit(flowProcess, tap, conf)
    conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
  }
}
Run Code Online (Sandbox Code Playgroud)

为您的组合输入创建一个方案.

trait CombineFileTextLineScheme extends TextLineScheme{

  override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
Run Code Online (Sandbox Code Playgroud)

最后,创建您的源类:

case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme
Run Code Online (Sandbox Code Playgroud)

如果要使用单个路径而不是多个路径,则对源类的更改是微不足道的.

我希望有所帮助.