有没有办法在Hadoop中为Mapper提供构造函数args?可能通过一些包装创造就业的图书馆?
这是我的情景:
public class HadoopTest {
// Extractor turns a line into a "feature"
public static interface Extractor {
public String extract(String s);
}
// A concrete Extractor, configurable with a constructor parameter
public static class PrefixExtractor implements Extractor {
private int endIndex;
public PrefixExtractor(int endIndex) { this.endIndex = endIndex; }
public String extract(String s) { return s.substring(0, this.endIndex); }
}
public static class Map extends Mapper<Object, Text, Text, Text> {
private Extractor extractor;
// Constructor configures the extractor
public Map(Extractor extractor) { this.extractor = extractor; }
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String feature = extractor.extract(value.toString());
context.write(new Text(feature), new Text(value.toString()));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) context.write(key, val);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "test");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Run Code Online (Sandbox Code Playgroud)
应该清楚,因为Mapper只被赋予Configuration作为类的引用(Map.class),Hadoop无法传递构造函数参数并配置特定的Extractor.
有一些Hadoop包装框架就像Scoobi,Crunch,Scrunch(可能还有更多我不知道的)似乎有这种能力,但我不知道他们是如何实现的. 编辑:在与Scoobi合作之后,我发现我对此有些不对劲.如果在"映射器"中使用外部定义的对象,则Scoobi要求它是可序列化的,并且如果不是,则会在运行时进行抱怨.所以也许正确的方法就是Extractor在Mapper的设置方法中使我的序列化和反序列化......
此外,我实际上在Scala工作,所以非常欢迎基于Scala的解决方案(如果不鼓励!)
我建议告诉你的mapper通过Configuration你正在创建的对象使用哪个提取器.映射器在其setup方法(context.getConfiguration())中接收配置.看起来您不能将对象放在配置中,因为它通常是从XML文件或命令行构造的,但您可以设置枚举值并让映射器自己构造其提取器.在创建映射器之后定制映射器并不是很漂亮,但这就是我对API的解释.
在提交作业时设置实现类名
Configuration conf = new Configuration();
conf.set("PrefixExtractorClass", "com.my.class.ThreePrefixExtractor");
Run Code Online (Sandbox Code Playgroud)
或者使用命令行中的-D选项设置PrefixExtractorClass选项.
下面是mapper中的实现
Extractor extractor = null;
protected void setup(Context context) throws IOException,
InterruptedException
{
try {
Configuration conf = context.getConfiguration();
String className = conf.get("PrefixExtractorClass");
extractor = Class.forName(className);
} Catch (ClassNotFoundException e) {
//handle the exception
}
}
Run Code Online (Sandbox Code Playgroud)
现在使用extractormap函数中所需的对象.
包含com.my.class.ThreePrefixExtractor该类的jar 应该分发给所有节点.以下是来自Cloudera 的一篇文章,介绍了不同的方法.
在上面的例子中com.my.class.ThreePrefixExtractor应该扩展Extractor类.
使用这种方法可以使映射器实现成为通用的.这是大多数框架采用的方法(使用Class.forName)来实现可实现特定接口的可插入组件.
到目前为止我想出的最好的解决方案是将我想要的对象的序列化版本传递给Mapper,并使用反射在运行时构造该对象。
所以,主要方法会这样说:
conf.set("ExtractorConstructor", "dicta03.hw4.PrefixExtractor(3)");
Run Code Online (Sandbox Code Playgroud)
然后,在映射器中我们使用辅助函数construct(定义如下)并且可以说:
public void setup(Context context) {
try {
String constructor = context.getConfiguration().get("ExtractorConstructor");
this.extractor = (Extractor) construct(constructor);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Run Code Online (Sandbox Code Playgroud)
定义construct使用反射在运行时从字符串递归构造对象:
public static Object construct(String s) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
if (s.matches("^[A-Za-z0-9.#]+\\(.*\\)$")) {
Class cls = null;
List<Object> argList = new ArrayList<Object>();
int parenCount = 0;
boolean quoted = false;
boolean escaped = false;
int argStart = -1;
for (int i = 0; i < s.length(); i++) {
if (escaped) {
escaped = false;
} else if (s.charAt(i) == '\\') {
escaped = true;
} else if (s.charAt(i) == '"') {
quoted = true;
} else if (!quoted) {
if (s.charAt(i) == '(') {
if (cls == null)
cls = Class.forName(s.substring(0, i));
parenCount++;
argStart = i + 1;
} else if (s.charAt(i) == ')') {
if (parenCount == 1)
argList.add(construct(s.substring(argStart, i)));
parenCount--;
} else if (s.charAt(i) == ',') {
if (parenCount == 1) {
argList.add(construct(s.substring(argStart, i)));
argStart = i + 1;
}
}
}
}
Object[] args = new Object[argList.size()];
Class[] argTypes = new Class[argList.size()];
for (int i = 0; i < argList.size(); i++) {
argTypes[i] = argList.get(i).getClass();
args[i] = argList.get(i);
}
Constructor constructor = cls.getConstructor(argTypes);
return constructor.newInstance(args);
} else if (s.matches("^\".*\"$")) {
return s.substring(1, s.length() - 1);
} else if (s.matches("^\\d+$")) {
return Integer.parseInt(s);
} else {
throw new RuntimeException("Cannot construct " + s);
}
}
Run Code Online (Sandbox Code Playgroud)
(这可能不是最强大的解析器,但它可以轻松扩展以覆盖更多类型的对象。)
| 归档时间: |
|
| 查看次数: |
5539 次 |
| 最近记录: |