yog*_*esh 22 rewrite hadoop fileoutputstream
我想在每天运行Hadoop作业时覆盖/重用现有的输出目录.实际上,输出目录将存储每天作业运行结果的汇总输出.如果我指定相同的输出目录,则会给出错误"输出目录已存在".
如何绕过此验证?
Tho*_*lut 15
在运行作业之前删除目录怎么样?
你可以通过shell来做到这一点:
hadoop fs -rmr /path/to/your/output/
Run Code Online (Sandbox Code Playgroud)
或通过Java API:
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
Run Code Online (Sandbox Code Playgroud)
Don*_*ner 11
Jungblut的答案是你的直接解决方案.由于我从不相信自动化流程删除内容(我个人),我会建议一个替代方案:
我建议你不要试图覆盖,而是让你的工作输出名称动态,包括它的运行时间.
像" /path/to/your/output-2011-10-09-23-04/
" 这样的东西.这样你就可以保持周围的旧作业输出的情况下,你需要在重新审视在我的系统,它运行日常工作10+,我们结构输出是:/output/job1/2011/10/09/job1out/part-r-xxxxx
,/output/job1/2011/10/10/job1out/part-r-xxxxx
,等.
Hadoop TextInputFormat
(我猜你正在使用)不允许覆盖现有目录.可能原谅你发现你错误地删除了你(和你的集群)非常努力的东西的痛苦.
但是,如果您确定要让作业覆盖输出文件夹,我相信最简洁的方法是更改TextOutputFormat
一点:
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed)
{
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, true);
if (!isCompressed)
{
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
else
{
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在你用overwrite = true 创建FSDataOutputStream
(fs.create(file, true)
).
归档时间: |
|
查看次数: |
30350 次 |
最近记录: |