如何再次覆盖/重用Hadoop作业的现有输出路径和agian

yog*_*esh 22 rewrite hadoop fileoutputstream

我想在每天运行Hadoop作业时覆盖/重用现有的输出目录.实际上,输出目录将存储每天作业运行结果的汇总输出.如果我指定相同的输出目录,则会给出错误"输出目录已存在".

如何绕过此验证?

Tho*_*lut 15

在运行作业之前删除目录怎么样?

你可以通过shell来做到这一点:

hadoop fs -rmr /path/to/your/output/
Run Code Online (Sandbox Code Playgroud)

或通过Java API:

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
Run Code Online (Sandbox Code Playgroud)


Don*_*ner 11

Jungblut的答案是你的直接解决方案.由于我从不相信自动化流程删除内容(我个人),我会建议一个替代方案:

我建议你不要试图覆盖,而是让你的工作输出名称动态,包括它的运行时间.

像" /path/to/your/output-2011-10-09-23-04/" 这样的东西.这样你就可以保持周围的旧作业输出的情况下,你需要在重新审视在我的系统,它运行日常工作10+,我们结构输出是:/output/job1/2011/10/09/job1out/part-r-xxxxx,/output/job1/2011/10/10/job1out/part-r-xxxxx,等.

  • 谢谢回复.我唯一的问题是 - 我不想删除现有的输出目录.每天当我运行我的工作时,我希望它的输出与现有输出(旧)合并.我想到的解决方案是生成/存储每日作业的输出到一些临时目录中,并通过某些脚本将临时文件夹结构复制粘贴到旧的输出目录中.我在s3上有像"Campaign_Id/Year/Month/day"的文件夹结构 (4认同)

har*_*rel 5

Hadoop TextInputFormat(我猜你正在使用)不允许覆盖现有目录.可能原谅你发现你错误地删除了你(和你的集群)非常努力的东西的痛苦.

但是,如果您确定要让作业覆盖输出文件夹,我相信最简洁的方法是更改TextOutputFormat一点:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}
Run Code Online (Sandbox Code Playgroud)

现在你用overwrite = true 创建FSDataOutputStream(fs.create(file, true)).