cae*_*301 4 hadoop mapreduce avro
我的所有程序都使用hadoop的新MR1接口(org.apache.hadoop.mapreduce)编写,所以我也想使用avro的新的org.apache.avro.mapreduce.但它对我不起作用.
该程序接收avro数据的输入并输出相同的数据.我的程序背后的主要思想是将hadoop的Mapper和Reducer子类化为avro包装的键/值.这是我的工作驱动程序块:
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
Run Code Online (Sandbox Code Playgroud)
MyAvroMap和MyAvroReduce子类的定义分别是
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{ ... }
Run Code Online (Sandbox Code Playgroud)
有效的NetflowRecord是我的avro记录类.我得到了例外
java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey
Run Code Online (Sandbox Code Playgroud)
通过阅读hadoop和avro的源代码,我发现JobConf抛出异常以确保map键是WritableComparable的子类,就像这样(hadoop1.2.1,line759)
WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
Run Code Online (Sandbox Code Playgroud)
但avro表明,AvroKey和AvroValue只是一个简单的包装器,没有子类化hadoop的Writable*接口.
我相信,即使没有测试,我也可以使用旧的mapred接口,但它不是我想要的.你可以给我一些关于使用纯org.apache.avro.mapreduce接口进行编程的例子或解释吗?
此致
贾敏
经过艰苦的搜索,在这个补丁https://issues.apache.org/jira/browse/AVRO-593的帮助下,我发现每个AvroKey和AvroValue包装器必须在作业配置中有一个模式定义.这就是我错过的.
在这里,我有两个选择:
如果保持MyAvroMap和MyAvroReduce不变,我必须为CharSequence定义一个模式,并使用AvroJob为Mapper输出声明这个模式,例如
AvroJob.setMapOutputKeySchema(job,<"defined-schema-for-charsequence">); AvroJob.setMapOutputValueSchema(job,NetflowRecord.getClassSchema());
通过将Mapper输出键/值更改为Text/AvroValue,我只需要为Mapper输出值添加模式声明,就像
job.setMapOutputKeyClass(Text.class); AvroJob.setMapOutputValueSchema(job,NetflowRecord.getClassSchema());
使用mapreduce API,我们不再需要继承AvroMapper和AvroReducer.在这里,我在我的代码中实现了option2而没有addtional模式定义.
贾敏