bil*_* ou 5 import mapreduce class sqoop
sqoop查询生成一个java文件,其中包含一个类,该类包含要在mapreduce中访问每行的列数据的代码.(Sqoop导入是在文本中完成的,没有--as-sequencefile选项,每个记录有1行,列之间有逗号)但是我们如何实际使用它?
我在这个类中找到了一个公共方法parse(),它将Text作为输入并填充了类的所有成员,因此为了练习我修改了wordcount应用程序,将一行文本从mapper中的TextInputFormat转换为一个instnace sqoop生成的类.但是当我调用parse()方法时,这会导致"未报告的exception.com.cloudera.sqoop.lib.RecordParser.ParseError;必须被捕获或声明被抛出".
可以这样做,还是使用每条记录中的数据填充类所必需的自定义InputFormat?
好吧,一旦你发现,这似乎是显而易见的,但作为一个 Java 初学者,这可能需要时间。
首先配置您的项目:只需将 sqoop 生成的 .java 文件添加到源文件夹中。我使用 eclipse 将其导入到我的类源文件夹中。
然后确保您正确配置了项目的 java 构建路径:
在项目的properties/java build path/libraries/add external jar:(for hadoop cdh4+)中添加以下jar文件:
/usr/lib/hadoop/hadoop-common.jar
/usr/lib/hadoop-[version]-mapreduce/hadoop-core.jar
/usr/lib/sqoop/sqoop-[sqoop-version]-cdh[cdh-version].jar
Run Code Online (Sandbox Code Playgroud)
然后调整你的mapreduce源代码:首先配置它:
public int run(String [] args) throws exception
{
Job job = new Job(getConf());
job.setJarByClass(YourClass.class);
job.setMapperClass(SqoopImportMap.class);
job.setReducerClass(SqoopImprtReduce.class);
FileInputFormat.addInputPath((job,"hdfs_path_to_your_sqoop_imported_file"));
FileOutputFormat.setOutputPath((job,"hdfs_output_path"));
// I simply use text as output for the mapper but it can be any class you designed
// as long as you implement it as a Writable
job.setMapOutputKeyClass(Text.Class);
job.setMapOutputValueClass(Text.Class);
job.setOutputKeyClass(Text.Class);
job.setOutputValueClass(Text.Class);
...
Run Code Online (Sandbox Code Playgroud)
现在配置您的映射器类。假设您的 sqoop 导入的 java 文件名为 Sqimp.java:并且您导入的表具有以下列:id、name、age 您的映射器类应如下所示:
public static class SqoopImportMap
extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable k, Text v, Context context)
{
Sqimp s = new Sqimp();
try
{
// this is where the code generated by sqoop is used.
// it automatically casts one line of the imported data into an instance of the generated class,
// to let you access the data inside the columns easily
s.parse(v);
}
catch(ParseError pe) {// do something if there is an error.}
try
{
// now the imported data is accessible:
// e.g
if (s.age>30)
{
// submit the selected data to the mapper's output as a key value pair.
context.write(new Text(s.age),new Text(s.id));
}
}
catch(Exception ex)
{//do something about the error}
}
}
Run Code Online (Sandbox Code Playgroud)