如何用AVRO org.apache.avro.mapreduce接口编程?

cae*_*301 4 hadoop mapreduce avro

我的所有程序都使用hadoop的新MR1接口(org.apache.hadoop.mapreduce)编写,所以我也想使用avro的新的org.apache.avro.mapreduce.但它对我不起作用.

该程序接收avro数据的输入并输出相同的数据.我的程序背后的主要思想是将hadoop的Mapper和Reducer子类化为avro包装的键/值.这是我的工作驱动程序块:

    AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());

    job.setMapperClass(MyAvroMap.class);
    job.setReducerClass(MyAvroReduce.class);

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);

    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);
Run Code Online (Sandbox Code Playgroud)

MyAvroMap和MyAvroReduce子类的定义分别是

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
            AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }

public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                AvroKey<NetflowRecord>, NullWritable>{ ... }
Run Code Online (Sandbox Code Playgroud)

有效的NetflowRecord是我的avro记录类.我得到了例外

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey
Run Code Online (Sandbox Code Playgroud)

通过阅读hadoop和avro的源代码,我发现JobConf抛出异常以确保map键是WritableComparable的子类,就像这样(hadoop1.2.1,line759)

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
Run Code Online (Sandbox Code Playgroud)

但avro表明,AvroKey和AvroValue只是一个简单的包装器,没有子类化hadoop的Writable*接口.

我相信,即使没有测试,我也可以使用旧的mapred接口,但它不是我想要的.你可以给我一些关于使用纯org.apache.avro.mapreduce接口进行编程的例子或解释吗?

此致

贾敏

cae*_*301 5

经过艰苦的搜索,在这个补丁https://issues.apache.org/jira/browse/AVRO-593的帮助下,我发现每个AvroKey和AvroValue包装器必须在作业配置中有一个模式定义.这就是我错过的.

在这里,我有两个选择:

  1. 如果保持MyAvroMap和MyAvroReduce不变,我必须为CharSequence定义一个模式,并使用AvroJob为Mapper输出声明这个模式,例如

    AvroJob.setMapOutputKeySchema(job,<"defined-schema-for-charsequence">); AvroJob.setMapOutputValueSchema(job,NetflowRecord.getClassSchema());

  2. 通过将Mapper输出键/值更改为Text/AvroValue,我只需要为Mapper输出值添加模式声明,就像

    job.setMapOutputKeyClass(Text.class); AvroJob.setMapOutputValueSchema(job,NetflowRecord.getClassSchema());

使用mapreduce API,我们不再需要继承AvroMapper和AvroReducer.在这里,我在我的代码中实现了option2而没有addtional模式定义.

贾敏