Pse*_*udo 3 hadoop mapreduce output
我是Hadoop的新手,但这是我上个月的一个学习项目.
为了保持这种含糊不清以便对他人有用,让我先抛弃基本目标......假设:
我看过多个线程:
还有更多..我也一直在阅读Tom White的Hadoop书.我一直在急切地想要学习这一点.我经常在新API和旧API之间进行交换,这增加了尝试学习这一点的困惑.
许多人指向MultipleOutputs(或旧的api版本),但我似乎无法生成我想要的输出 - 例如,MultipleOutputs似乎不接受"/"来在write()中创建目录结构
创建具有所需输出结构的文件需要采取哪些步骤?目前我有一个WholeFileInputFormat类,以及具有(NullWritable K,ByteWritable V)对的相关RecordReader(如果需要可以更改)
我的地图设置:
public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text filenameKey;
private MultipleOutputs<NullWritable, Text> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/
mos = new MultipleOutputs(context);
}
}
Run Code Online (Sandbox Code Playgroud)
还有一个清理()函数调用mos.close()和地图()功能是目前未知的(我需要帮助这里)
这是否足以将新手指向答案的方向?我的下一个想法是在每个map()任务中创建一个MultipleOutputs()对象,每个都有一个新的baseoutput字符串,但我不确定它是否有效甚至是正确的行动.
建议将不胜感激,程序中的任何内容都可以在此时更改,除了输入 - 我只是想学习框架 - 但我希望尽可能接近这个结果(稍后我可能会看看将记录与较大的文件相结合,但它们已经是每条记录20MB,我想确保它在我无法在记事本中读取之前有效
编辑:可以通过修改/扩展TextOutputFormat.class来解决这个问题吗?似乎它可能有一些可行的方法,但我不确定我需要覆盖哪些方法......
如果关闭推测性执行,则无法阻止您在映射器中手动创建输出文件夹结构/文件,并将记录写入它们(忽略输出上下文/收集器)
例如,扩展代码片段(设置方法),您可以执行类似这样的操作(基本上是多个输出正在执行的操作,但假设关闭推测执行以避免文件冲突,其中两个映射任务尝试写入相同的输出文件):
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MultiOutputsMapper extends
Mapper<LongWritable, Text, NullWritable, NullWritable> {
protected String filenameKey;
private RecordWriter<Text, Text> writer;
private Text outputValue;
private Text outputKey;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// operate on the input record
// ...
// write to output file using writer rather than context
writer.write(outputKey, outputValue);
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
// extract parent folder and filename
filenameKey = path.getParent().getName() + "/" + path.getName();
// base output folder
final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
// output file name
final Path outputFilePath = new Path(baseOutputPath, filenameKey);
// We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder
TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {
@Override
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException {
return outputFilePath;
}
};
// create a record writer that will write to the desired output subfolder
writer = tof.getRecordWriter(context);
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
writer.close(context);
}
}
Run Code Online (Sandbox Code Playgroud)
一些要考虑的要点:
customerx/yyyy-MM-dd路径的文件或文件的文件夹内(如果文件的文件夹内,那么你就需要进行相应的修改-这实现假定有每个日期一个文件,文件名是YYYY-MM-DD)