Spark中的Xml处理

Pav*_*ani 18 apache-spark

场景:我的输入将是多个小型XML,并且我们希望将这些XML作为RDD读取.与另一个数据集执行连接并形成RDD并将输出作为XML发送.

是否可以使用spark读取XML,将数据加载为RDD?如果可能,将如何读取XML.

示例XML:

<root>
    <users>
        <user>
              <account>1234<\account>
              <name>name_1<\name>
              <number>34233<\number>
         <\user>
         <user>
              <account>58789<\account>
              <name>name_2<\name>
              <number>54697<\number>
         <\user>    
    <\users>
<\root>
Run Code Online (Sandbox Code Playgroud)

如何将其加载到RDD中?

zer*_*323 20

是的可能,但细节会因您采取的方法而有所不同.

  • 如果文件很小,正如您所提到的,最简单的解决方案是使用加载数据SparkContext.wholeTextFiles.它将数据加载RDD[(String, String)]到第一个元素是路径和第二个文件内容的位置.然后像在本地模式下一样单独解析每个文件.
  • 对于较大的文件,您可以使用Hadoop输入格式.
    • 如果结构很简单,您可以使用分割记录textinputformat.record.delimiter.你可以在这里找到一个简单的例子.输入不是XML,但您应该给出并了解如何继续
    • 否则Mahout提供 XmlInputFormat
  • 最后,可以使用读取文件SparkContext.textFile,稍后调整分区之间的记录跨度.从概念上讲,它意味着类似于创建滑动窗口或将记录分区为固定大小的组:

    • 使用mapPartitionsWithIndex分区来识别分区之间断开的记录,收集损坏的记录
    • 使用秒mapPartitionsWithIndex来修复损坏的记录

编辑:

还有一个相对较新的spark-xml包,允许您通过标记提取特定记录:

val df = sqlContext.read
  .format("com.databricks.spark.xml")
   .option("rowTag", "foo")
   .load("bar.xml")
Run Code Online (Sandbox Code Playgroud)


Ksh*_*tha 6

以下是执行它的方法 - >我使用HadoopInputFormats读取spark中的XML数据,如zero323所述.

输入数据 - >

<root>
    <users>
        <user>
            <account>1234<\account>
            <name>name_1<\name>
            <number>34233<\number>
        <\user>
        <user>
            <account>58789<\account>
            <name>name_2<\name>
            <number>54697<\number>
        <\user>
    <\users>
<\root>
Run Code Online (Sandbox Code Playgroud)

读取XML输入的代码 - >

你会在这个链接上得到一些罐子

//---------------spark_import
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

//----------------xml_loader_import
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{ LongWritable, Text }
import com.cloudera.datascience.common.XmlInputFormat
Run Code Online (Sandbox Code Playgroud)

输出 - >

object Tester_loader {
  case class User(account: String, name: String, number: String)
  def main(args: Array[String]): Unit = {

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/"
    val sparkMasterUrl = "spark://SYSTEMX:7077"

    var jars = new Array[String](3)

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar"
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar"

    val conf = new SparkConf().setAppName("XML Reading")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .setSparkHome(sparkHome)
      .set("spark.executor.memory", "512m")
      .set("spark.default.deployCores", "12")
      .set("spark.cores.max", "12")
      .setJars(jars)

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // ---- loading user from XML

    // calling function 1.1
    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) 

    val xmlUserDF = pages.map { tuple =>
      {
        val account = extractField(tuple, "account")
        val name = extractField(tuple, "name")
        val number = extractField(tuple, "number")

        User(account, name, number)
      }
    }.toDF()
    println(xmlUserDF.count())
    xmlUserDF.show()
  }
Run Code Online (Sandbox Code Playgroud)

获得的结果是在数据框中,您可以根据您的要求将它们转换为RDD,如this->

  def readFile(path: String, start_tag: String, end_tag: String,
      sc: SparkContext) = {

    val conf = new Configuration()
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag)
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag)
    val rawXmls = sc.newAPIHadoopFile(
        path, classOf[XmlInputFormat], classOf[LongWritable],
        classOf[Text], conf)

    rawXmls.map(p => p._2.toString)
  }

  def extractField(tuple: String, tag: String) = {
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</")

    if (value.contains("<" + tag + ">") &&
        value.contains("</" + tag + ">")) {
      value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0)
    }
    value
  }

}
Run Code Online (Sandbox Code Playgroud)

请评估它,如果它可以帮助你如何.


Sat*_*uri 5

这会对你有所帮助.

package packagename;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import com.databricks.spark.xml.XmlReader;

public class XmlreaderSpark {
    public static void main(String arr[]){
    String localxml="file path";
    String booksFileTag = "user";

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
   System.out.println("warehouseLocation" + warehouseLocation);
    SparkSession spark = SparkSession
              .builder()
              .master("local")
              .appName("Java Spark SQL Example")
              .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation)
              .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true")
              .getOrCreate();
    SQLContext sqlContext = new SQLContext(spark);

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml);
    df.show();

    }
}
Run Code Online (Sandbox Code Playgroud)

您需要在POM.xml中添加此依赖项:

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-xml_2.10</artifactId>
   <version>0.4.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

并且您的输入文件格式不正确.

谢谢.