通过HADOOP进行XML解析

trp*_*trp 1 hadoop

我有大量 WIKI 数据需要分析。这些转储基本上是 XML 文件。如何使用HADOOP MapReduce解析XML文件?

小智 5

import java.io.ByteArrayInputStream;\nimport java.io.IOException;\n\nimport javax.xml.stream.XMLInputFactory;\nimport javax.xml.stream.XMLStreamConstants;\nimport javax.xml.stream.XMLStreamReader;\n\nimport org.apache.hadoop.conf.Configuration;\nimport org.apache.hadoop.fs.FSDataInputStream;\nimport org.apache.hadoop.fs.FileSystem;\nimport org.apache.hadoop.fs.Path;\nimport org.apache.hadoop.io.DataOutputBuffer;\nimport org.apache.hadoop.io.LongWritable;\nimport org.apache.hadoop.io.Text;\nimport org.apache.hadoop.mapreduce.InputSplit;\nimport org.apache.hadoop.mapreduce.Job;\nimport org.apache.hadoop.mapreduce.Mapper;\nimport org.apache.hadoop.mapreduce.RecordReader;\nimport org.apache.hadoop.mapreduce.Reducer;\nimport org.apache.hadoop.mapreduce.TaskAttemptContext;\nimport org.apache.hadoop.mapreduce.lib.input.FileInputFormat;\nimport org.apache.hadoop.mapreduce.lib.input.FileSplit;\nimport org.apache.hadoop.mapreduce.lib.input.TextInputFormat;\nimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;\nimport org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;\n\npublic class XmlDriver\n{\n\n public static class XmlInputFormat1 extends TextInputFormat {\n\n    public static final String START_TAG_KEY = "xmlinput.start";\n    public static final String END_TAG_KEY = "xmlinput.end";\n\n\n    public RecordReader<LongWritable, Text> createRecordReader(\n            InputSplit split, TaskAttemptContext context) {\n        return new XmlRecordReader();\n    }\n\n    /**\n     * XMLRecordReader class to read through a given xml document to output\n     * xml blocks as records as specified by the start tag and end tag\n     *\n     */\n\n    public static class XmlRecordReader extends\n    RecordReader<LongWritable, Text> {\n        private byte[] startTag;\n        private byte[] endTag;\n        private long start;\n        private long end;\n        private FSDataInputStream fsin;\n        private DataOutputBuffer buffer = new DataOutputBuffer();\n\n        private LongWritable key = new LongWritable();\n        private Text value = new Text();\n        @Override\n        public void initialize(InputSplit split, TaskAttemptContext context)\n        throws IOException, InterruptedException {\n            Configuration conf = context.getConfiguration();\n            startTag = conf.get(START_TAG_KEY).getBytes("utf-8");\n            endTag = conf.get(END_TAG_KEY).getBytes("utf-8");\n            FileSplit fileSplit = (FileSplit) split;\n\n            // open the file and seek to the start of the split\n            start = fileSplit.getStart();\n            end = start + fileSplit.getLength();\n            Path file = fileSplit.getPath();\n            FileSystem fs = file.getFileSystem(conf);\n            fsin = fs.open(fileSplit.getPath());\n            fsin.seek(start);\n\n        }\n        @Override\n        public boolean nextKeyValue() throws IOException,\n        InterruptedException {\n            if (fsin.getPos() < end) {\n                if (readUntilMatch(startTag, false)) {\n                    try {\n                        buffer.write(startTag);\n                        if (readUntilMatch(endTag, true)) {\n                            key.set(fsin.getPos());\n                            value.set(buffer.getData(), 0,\n                                    buffer.getLength());\n                            return true;\n                        }\n                    } finally {\n                        buffer.reset();\n                    }\n                }\n            }\n            return false;\n        }\n        @Override\n        public LongWritable getCurrentKey() throws IOException,\n        InterruptedException {\n            return key;\n        }\n\n        @Override\n        public Text getCurrentValue() throws IOException,\n        InterruptedException {\n            return value;\n        }\n        @Override\n        public void close() throws IOException {\n            fsin.close();\n        }\n        @Override\n        public float getProgress() throws IOException {\n            return (fsin.getPos() - start) / (float) (end - start);\n        }\n\n        private boolean readUntilMatch(byte[] match, boolean withinBlock)\n        throws IOException {\n            int i = 0;\n            while (true) {\n                int b = fsin.read();\n                // end of file:\n                    if (b == -1)\n                        return false;\n                // save to buffer:\n                    if (withinBlock)\n                        buffer.write(b);\n                // check if we\'re matching:\n                    if (b == match[i]) {\n                        i++;\n                        if (i >= match.length)\n                            return true;\n                    } else\n                        i = 0;\n                    // see if we\'ve passed the stop point:\n                    if (!withinBlock && i == 0 && fsin.getPos() >= end)\n                        return false;\n            }\n        }\n    }\n}\n\n\npublic static class Map extends Mapper<LongWritable, Text,\nText, Text> {\n    @Override\n    protected void map(LongWritable key, Text value,\n            Mapper.Context context)\n    throws\n    IOException, InterruptedException {\n        String document = value.toString();\n        System.out.println("\xe2\x80\x98" + document + "\xe2\x80\x98");\n        try {\n            XMLStreamReader reader =\n                XMLInputFactory.newInstance().createXMLStreamReader(new\n                        ByteArrayInputStream(document.getBytes()));\n            String propertyName = "";\n            String propertyValue = "";\n            String currentElement = "";\n            while (reader.hasNext()) {\n                int code = reader.next();\n                switch (code) {\n                case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:\n                    currentElement = reader.getLocalName();\n                    break;\n                case XMLStreamConstants.CHARACTERS: //CHARACTERS:\n                    if (currentElement.equalsIgnoreCase("name")) {\n                        propertyName += reader.getText();\n                        System.out.println("propertName"+propertyName);\n                    } else if (currentElement.equalsIgnoreCase("value")) {\n                        propertyValue += reader.getText();\n                        System.out.println("propertyValue"+propertyValue);\n                    }\n                    break;\n                }\n            }\n            reader.close();\n            context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));\n\n        }\n        catch(Exception e){\n            throw new IOException(e);\n\n        }\n\n    }\n}\npublic static class Reduce\nextends Reducer<Text, Text, Text, Text> {\n\n    @Override\n    protected void setup(\n            Context context)\n    throws IOException, InterruptedException {\n        context.write(new Text("<configuration>"), null);\n    }\n\n    @Override\n    protected void cleanup(\n            Context context)\n    throws IOException, InterruptedException {\n        context.write(new Text("</configuration>"), null);\n    }\n\n    private Text outputKey = new Text();\n    public void reduce(Text key, Iterable<Text> values,\n            Context context)\n    throws IOException, InterruptedException {\n        for (Text value : values) {\n            outputKey.set(constructPropertyXml(key, value));\n\n            context.write(outputKey, null);\n        }\n    }\n\n    public static String constructPropertyXml(Text name, Text value) {\n        StringBuilder sb = new StringBuilder();\n        sb.append("<property><name>").append(name)\n        .append("</name><value>").append(value)\n        .append("</value></property>");\n        return sb.toString();\n    }\n}\n\n\n\npublic static void main(String[] args) throws Exception\n{\n    Configuration conf = new Configuration();\n\n    conf.set("xmlinput.start", "<property>");\n    conf.set("xmlinput.end", "</property>");\n    Job job = new Job(conf);\n    job.setJarByClass(XmlDriver.class);\n    job.setOutputKeyClass(Text.class);\n    job.setOutputValueClass(Text.class);\n\n    job.setMapperClass(XmlDriver.Map.class);\n    job.setReducerClass(XmlDriver.Reduce.class);\n\n    job.setInputFormatClass(XmlInputFormat1.class);\n    job.setOutputFormatClass(TextOutputFormat.class);\n\n    FileInputFormat.addInputPath(job, new Path(args[0]));\n    FileOutputFormat.setOutputPath(job, new Path(args[1]));\n\n    job.waitForCompletion(true);\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

参数:- /home/test.xml /home/out

\n\n

Xml文件(test.xml):

\n\n
<configuration>\n<property>\n        <name>dfs.replication</name>\n        <value>1</value>\n </property>\n<property>\n    <name>dfs</name>\n    <value>2</value>\n</property>\n</configuration>\n
Run Code Online (Sandbox Code Playgroud)\n