Jul*_*ias 8 hadoop mapreduce cascading
我正在许多输入文件上运行hadoop作业.但是如果其中一个文件被破坏,整个工作就会失败.
如何使作业忽略损坏的文件?也许为我写一些计数器/错误日志但不会失败整个工作
这取决于你的工作失败的地方 - 如果一行损坏,并且你的map方法中某处抛出异常,那么你应该能够用try/catch包装map方法的主体并记录错误:
protected void map(LongWritable key, Text value, Context context) {
try {
// parse value to a long
int val = Integer.parseInt(value.toString());
// do something with key and val..
} catch (NumberFormatException nfe) {
// log error and continue
}
}
Run Code Online (Sandbox Code Playgroud)
但是如果您的InputFormat的RecordReader抛出错误,那么您将需要修改映射器run(..)方法 - 谁的默认实现如下:
public void run(Context context) {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
Run Code Online (Sandbox Code Playgroud)
所以你可以修改它来尝试捕获context.nextKeyValue()调用上的异常,但是你必须小心忽略读者抛出的任何错误 - 例如IOExeption可能不是'可跳过'而只是忽略错误.
如果您已经编写了自己的InputFormat/RecordReader,并且您有一个特定的异常表示记录失败但允许您跳过并继续解析,那么这样的事情可能会起作用:
public void run(Context context) {
setup(context);
while (true) {
try {
if (!context.nextKeyValue()) {
break;
} else {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} catch (SkippableRecordException sre) {
// log error
}
}
cleanup(context);
}
Run Code Online (Sandbox Code Playgroud)
但只是为了重新迭代 - 你的RecordReader必须能够在出错时恢复,否则上面的代码可能会让你进入无限循环.
对于您的特定情况 - 如果您只是想在第一次失败时忽略文件,那么您可以将run方法更新为更简单的方法:
public void run(Context context) {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
} catch (Exception e) {
// log error
}
}
Run Code Online (Sandbox Code Playgroud)
最后一些警告:
| 归档时间: |
|
| 查看次数: |
5700 次 |
| 最近记录: |