在运行简单的 MapReduce 程序时获取 java.lang.ClassCastException: class java.lang.String

add*_*015 5 java hadoop mapreduce classcastexception

我正在尝试执行一个简单的 MapReduce 程序,其中 Map 接受输入,将其分为两部分(键=>字符串和值=>整数)。reducer 总结了我每次都会收到 ClassCastException 的相应键的值。我无法理解代码中的什么导致了这个错误

我的代码:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Test {
public static class Map extends MapReduceBase implements
        Mapper<LongWritable, Text, String, Integer> {

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<String, Integer> output, Reporter reporter)
            throws IOException {
        String line = value.toString();
        String[] lineParts = line.split(",");
        output.collect(lineParts[0], Integer.parseInt(lineParts[1]));

    }
}

public static class Reduce extends MapReduceBase implements
        Reducer<String, Integer, String, Integer> {

    @Override
    public void reduce(String key, Iterator<Integer> values,
            OutputCollector<String, Integer> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum = sum + values.next();
        }
        output.collect(key, sum);
    }
}

public static void main(String[] args) throws Exception {

    JobConf conf = new JobConf(Test.class);
    conf.setJobName("ProductCount");

    conf.setMapOutputKeyClass(String.class);
    conf.setMapOutputValueClass(Integer.class);

    conf.setOutputKeyClass(String.class);
    conf.setOutputValueClass(Integer.class);

    conf.setMapperClass(Map.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

}
}
Run Code Online (Sandbox Code Playgroud)

样本数据:

abc,10
abc,10
abc,10
def,9
def,9
Run Code Online (Sandbox Code Playgroud)

以下是堆栈跟踪。这和我的键值有什么关系吗?

14/02/11 23:57:35 INFO mapred.JobClient: Task Id : attempt_201402110240_0013_m_000001_2, Status : FAILED
java.lang.ClassCastException: class java.lang.String
at java.lang.Class.asSubclass(Class.java:3018)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:816)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:382)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:324)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:262)


Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1246)
at Test.main(Test.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:186)
Run Code Online (Sandbox Code Playgroud)

snr*_*rlx 3

在我看来,您似乎没有为输出使用正确的类。

来自 MapReduce教程之一:

键和值类必须由框架可序列化,因此需要实现 Writable 接口。此外,关键类必须实现 WritableComparable 接口,以方便框架进行排序。

因此您应该替换String.classText.classInteger.classIntWritable.class

我希望这能解决你的问题。

为什么我不能使用基本的 String 或 Integer 类?

Integer 和 String 实现了 Java 的标准可序列化接口,如文档中所示。问题是 MapReduce 序列化/反序列化值时不使用此标准接口,而是使用自己的接口,称为Writable

那么他们为什么不直接使用基本的 Java 接口呢?

简短的回答:因为它更有效率。可写接口在序列化时省略类型定义,因为您已经在 MapReduce 代码中定义了输入/输出的类型。由于您的代码已经知道即将发生什么,因此不要像这样序列化字符串:

String: "theStringItself"
Run Code Online (Sandbox Code Playgroud)

它可以被序列化为:

theStringItself
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,这节省了大量的内存。

长答案:阅读这篇精彩的博客文章