我正在使用Hadoop 0.20.2(无法更改),我想在输入路径中添加一个过滤器.数据如下:
/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2
Run Code Online (Sandbox Code Playgroud)
我只想处理所有带有火车的文件.
看一下FileInputFormat类建议使用:
FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)
Run Code Online (Sandbox Code Playgroud)
这是我的问题开始的地方,因为PathFilter是一个接口 - 当然,我可以扩展接口但是我仍然没有实现.所以相反,我实现了接口:
class TrainFilter implements PathFilter
{
boolean accept(Path path)
{
return path.toString().contains("train");
}
}
Run Code Online (Sandbox Code Playgroud)
当我使用TrainFilter作为PathFilter时代码编译,但是当我运行它时,我得到一个例外,因为输入路径被搞砸了.如果不设置过滤器,我的代码将运行在/ path1下面的所有文件,但是,在设置过滤器时,它会抛出错误:
InvalidInputException: Input path does not exist hdfs://localhost:9000/path1
Run Code Online (Sandbox Code Playgroud)
以下是我在驱动程序代码中进行设置的方法:
job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);
Run Code Online (Sandbox Code Playgroud)
我在这里做错了什么建议?
编辑:我发现了问题.对PathFilter的第一次调用始终是目录本身(/ path1),并且由于它不包含("train"),因此目录本身无效,因此抛出异常.这让我想到另一个问题:如何测试任意路径是否是目录?据我所知,我需要一个FileSystem的引用,它不是PathFilter的默认参数之一.
或者,您可以尝试遍历给定目录中的所有文件,并检查文件名是否以train开头.例如:
Job job = new Job(conf, "myJob");
List<Path> inputhPaths = new ArrayList<Path>();
String basePath = "/user/hadoop/path";
FileSystem fs = FileSystem.get(conf);
FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
for (FileStatus fstat : listStatus) {
inputhPaths.add(fstat.getPath());
}
FileInputFormat.setInputPaths(job,
(Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
Run Code Online (Sandbox Code Playgroud)