SUD*_*HAN 5 java zip mapreduce zipoutputstream hadoop2
我的MapReduce必须从HBase读取记录并需要写入zip文件.我们的客户特别询问减速器输出文件应该只是.zip文件.
为此,我编写了ZipFileOutputFormat包装来压缩记录并写入zip文件.
此外,我们不能使用缓冲区并将所有行保留到缓冲区然后迭代,因为某些文件包含19GB的记录,那时它将抛出一个java.lang.OutOfMemoryError.
一切似乎都好,但有一个问题:
该.zip是越来越为每个键创建的文件.在我的输出文件中,我可以看到许多输出文件,这些是每行键分隔文件.我不知道如何将它组合在zip文件中.
这是我的实施 ZipFileOutputFormat.java
public class ZipFileOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static class ZipRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
private ZipOutputStream zipOut;
public ZipRecordWriter(FSDataOutputStream fileOut) {
zipOut = new ZipOutputStream(fileOut);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
zipOut.closeEntry();
zipOut.finish();
zipOut.close();
zipOut.flush();
}
@Override
public void write(K key, V value) throws IOException {
String fname = null;
if (key instanceof BytesWritable) {
BytesWritable bk = (BytesWritable) key;
fname = new String(bk.getBytes(), 0, bk.getLength());
} else {
fname = key.toString();
}
ZipEntry ze = new ZipEntry(fname);
zipOut.closeEntry();
zipOut.putNextEntry(ze);
if (value instanceof BytesWritable) {
zipOut.write(((BytesWritable) value).getBytes(), 0, ((BytesWritable) value).getLength());
} else {
zipOut.write(value.toString().getBytes());
}
}
}
//
// @Override
// public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf
// job,
// String name, Progressable progress) throws IOException {
// Path file = FileOutputFormat.getTaskOutputPath(job, name);
// FileSystem fs = file.getFileSystem(job);
// FSDataOutputStream fileOut = fs.create(file, progress);
// return new ZipRecordWriter<K, V>(fileOut);
// }
@Override
public org.apache.hadoop.mapreduce.RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = job.getConfiguration();
getOutputCommitter(job);
getOutputName(job);
Path file = getDefaultWorkFile(job, ".zip");
// Path file = new Path(committer.getWorkPath()+"/"+fileName);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file);
return new ZipRecordWriter<K, V>(fileOut);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
855 次 |
| 最近记录: |