如何防止hadoop作业在损坏的输入文件上失败

Jul*_*ias 8 hadoop mapreduce cascading

我正在许多输入文件上运行hadoop作业.但是如果其中一个文件被破坏,整个工作就会失败.

如何使作业忽略损坏的文件?也许为我写一些计数器/错误日志但不会失败整个工作

Chr*_*ite 7

这取决于你的工作失败的地方 - 如果一行损坏,并且你的map方法中某处抛出异常,那么你应该能够用try/catch包装map方法的主体并记录错误:

protected void map(LongWritable key, Text value, Context context) {
  try {
    // parse value to a long
    int val = Integer.parseInt(value.toString());

    // do something with key and val..
  } catch (NumberFormatException nfe) {
    // log error and continue
  }
}
Run Code Online (Sandbox Code Playgroud)

但是如果您的InputFormat的RecordReader抛出错误,那么您将需要修改映射器run(..)方法 - 谁的默认实现如下:

public void run(Context context) {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);
}
Run Code Online (Sandbox Code Playgroud)

所以你可以修改它来尝试捕获context.nextKeyValue()调用上的异常,但是你必须小心忽略读者抛出的任何错误 - 例如IOExeption可能不是'可跳过'而只是忽略错误.

如果您已经编写了自己的InputFormat/RecordReader,并且您有一个特定的异常表示记录失败但允许您跳过并继续解析,那么这样的事情可能会起作用:

public void run(Context context) {
  setup(context);
  while (true) {
    try {
      if (!context.nextKeyValue()) { 
        break;
      } else {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } catch (SkippableRecordException sre) {
      // log error
    }

  }
  cleanup(context);
}
Run Code Online (Sandbox Code Playgroud)

但只是为了重新迭代 - 你的RecordReader必须能够在出错时恢复,否则上面的代码可能会让你进入无限循环.

对于您的特定情况 - 如果您只是想在第一次失败时忽略文件,那么您可以将run方法更新为更简单的方法:

public void run(Context context) {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  } catch (Exception e) {
    // log error
  }
}
Run Code Online (Sandbox Code Playgroud)

最后一些警告:

  • 你需要确保它不是你的映射器代码导致异常被抛出,否则你会因为错误的原因而忽略文件
  • 非GZip压缩的GZip压缩文件在记录阅读器的初始化中实际上会失败 - 所以上面的内容不会捕获这种类型或错误(您需要编写自己的记录阅读器实现).对于在创建记录阅读器期间引发的任何文件错误,都是如此