小智 5
import java.io.ByteArrayInputStream;\nimport java.io.IOException;\n\nimport javax.xml.stream.XMLInputFactory;\nimport javax.xml.stream.XMLStreamConstants;\nimport javax.xml.stream.XMLStreamReader;\n\nimport org.apache.hadoop.conf.Configuration;\nimport org.apache.hadoop.fs.FSDataInputStream;\nimport org.apache.hadoop.fs.FileSystem;\nimport org.apache.hadoop.fs.Path;\nimport org.apache.hadoop.io.DataOutputBuffer;\nimport org.apache.hadoop.io.LongWritable;\nimport org.apache.hadoop.io.Text;\nimport org.apache.hadoop.mapreduce.InputSplit;\nimport org.apache.hadoop.mapreduce.Job;\nimport org.apache.hadoop.mapreduce.Mapper;\nimport org.apache.hadoop.mapreduce.RecordReader;\nimport org.apache.hadoop.mapreduce.Reducer;\nimport org.apache.hadoop.mapreduce.TaskAttemptContext;\nimport org.apache.hadoop.mapreduce.lib.input.FileInputFormat;\nimport org.apache.hadoop.mapreduce.lib.input.FileSplit;\nimport org.apache.hadoop.mapreduce.lib.input.TextInputFormat;\nimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;\nimport org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;\n\npublic class XmlDriver\n{\n\n public static class XmlInputFormat1 extends TextInputFormat {\n\n public static final String START_TAG_KEY = "xmlinput.start";\n public static final String END_TAG_KEY = "xmlinput.end";\n\n\n public RecordReader<LongWritable, Text> createRecordReader(\n InputSplit split, TaskAttemptContext context) {\n return new XmlRecordReader();\n }\n\n /**\n * XMLRecordReader class to read through a given xml document to output\n * xml blocks as records as specified by the start tag and end tag\n *\n */\n\n public static class XmlRecordReader extends\n RecordReader<LongWritable, Text> {\n private byte[] startTag;\n private byte[] endTag;\n private long start;\n private long end;\n private FSDataInputStream fsin;\n private DataOutputBuffer buffer = new DataOutputBuffer();\n\n private LongWritable key = new LongWritable();\n private Text value = new Text();\n @Override\n public void initialize(InputSplit split, TaskAttemptContext context)\n throws IOException, InterruptedException {\n Configuration conf = context.getConfiguration();\n startTag = conf.get(START_TAG_KEY).getBytes("utf-8");\n endTag = conf.get(END_TAG_KEY).getBytes("utf-8");\n FileSplit fileSplit = (FileSplit) split;\n\n // open the file and seek to the start of the split\n start = fileSplit.getStart();\n end = start + fileSplit.getLength();\n Path file = fileSplit.getPath();\n FileSystem fs = file.getFileSystem(conf);\n fsin = fs.open(fileSplit.getPath());\n fsin.seek(start);\n\n }\n @Override\n public boolean nextKeyValue() throws IOException,\n InterruptedException {\n if (fsin.getPos() < end) {\n if (readUntilMatch(startTag, false)) {\n try {\n buffer.write(startTag);\n if (readUntilMatch(endTag, true)) {\n key.set(fsin.getPos());\n value.set(buffer.getData(), 0,\n buffer.getLength());\n return true;\n }\n } finally {\n buffer.reset();\n }\n }\n }\n return false;\n }\n @Override\n public LongWritable getCurrentKey() throws IOException,\n InterruptedException {\n return key;\n }\n\n @Override\n public Text getCurrentValue() throws IOException,\n InterruptedException {\n return value;\n }\n @Override\n public void close() throws IOException {\n fsin.close();\n }\n @Override\n public float getProgress() throws IOException {\n return (fsin.getPos() - start) / (float) (end - start);\n }\n\n private boolean readUntilMatch(byte[] match, boolean withinBlock)\n throws IOException {\n int i = 0;\n while (true) {\n int b = fsin.read();\n // end of file:\n if (b == -1)\n return false;\n // save to buffer:\n if (withinBlock)\n buffer.write(b);\n // check if we\'re matching:\n if (b == match[i]) {\n i++;\n if (i >= match.length)\n return true;\n } else\n i = 0;\n // see if we\'ve passed the stop point:\n if (!withinBlock && i == 0 && fsin.getPos() >= end)\n return false;\n }\n }\n }\n}\n\n\npublic static class Map extends Mapper<LongWritable, Text,\nText, Text> {\n @Override\n protected void map(LongWritable key, Text value,\n Mapper.Context context)\n throws\n IOException, InterruptedException {\n String document = value.toString();\n System.out.println("\xe2\x80\x98" + document + "\xe2\x80\x98");\n try {\n XMLStreamReader reader =\n XMLInputFactory.newInstance().createXMLStreamReader(new\n ByteArrayInputStream(document.getBytes()));\n String propertyName = "";\n String propertyValue = "";\n String currentElement = "";\n while (reader.hasNext()) {\n int code = reader.next();\n switch (code) {\n case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:\n currentElement = reader.getLocalName();\n break;\n case XMLStreamConstants.CHARACTERS: //CHARACTERS:\n if (currentElement.equalsIgnoreCase("name")) {\n propertyName += reader.getText();\n System.out.println("propertName"+propertyName);\n } else if (currentElement.equalsIgnoreCase("value")) {\n propertyValue += reader.getText();\n System.out.println("propertyValue"+propertyValue);\n }\n break;\n }\n }\n reader.close();\n context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));\n\n }\n catch(Exception e){\n throw new IOException(e);\n\n }\n\n }\n}\npublic static class Reduce\nextends Reducer<Text, Text, Text, Text> {\n\n @Override\n protected void setup(\n Context context)\n throws IOException, InterruptedException {\n context.write(new Text("<configuration>"), null);\n }\n\n @Override\n protected void cleanup(\n Context context)\n throws IOException, InterruptedException {\n context.write(new Text("</configuration>"), null);\n }\n\n private Text outputKey = new Text();\n public void reduce(Text key, Iterable<Text> values,\n Context context)\n throws IOException, InterruptedException {\n for (Text value : values) {\n outputKey.set(constructPropertyXml(key, value));\n\n context.write(outputKey, null);\n }\n }\n\n public static String constructPropertyXml(Text name, Text value) {\n StringBuilder sb = new StringBuilder();\n sb.append("<property><name>").append(name)\n .append("</name><value>").append(value)\n .append("</value></property>");\n return sb.toString();\n }\n}\n\n\n\npublic static void main(String[] args) throws Exception\n{\n Configuration conf = new Configuration();\n\n conf.set("xmlinput.start", "<property>");\n conf.set("xmlinput.end", "</property>");\n Job job = new Job(conf);\n job.setJarByClass(XmlDriver.class);\n job.setOutputKeyClass(Text.class);\n job.setOutputValueClass(Text.class);\n\n job.setMapperClass(XmlDriver.Map.class);\n job.setReducerClass(XmlDriver.Reduce.class);\n\n job.setInputFormatClass(XmlInputFormat1.class);\n job.setOutputFormatClass(TextOutputFormat.class);\n\n FileInputFormat.addInputPath(job, new Path(args[0]));\n FileOutputFormat.setOutputPath(job, new Path(args[1]));\n\n job.waitForCompletion(true);\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n参数:- /home/test.xml /home/out
\n\nXml文件(test.xml):
\n\n<configuration>\n<property>\n <name>dfs.replication</name>\n <value>1</value>\n </property>\n<property>\n <name>dfs</name>\n <value>2</value>\n</property>\n</configuration>\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
12946 次 |
| 最近记录: |