如何使用 spark 将镶木地板数据转换为案例类?

jbr*_*own 2 scala apache-spark apache-spark-sql

我有很多案例类,我在 spark 中使用它们将数据保存为镶木地板,例如:

case class Person(userId: String,
              technographic: Option[Technographic] = None,
              geographic: Option[Geographic] = None)

case class Technographic(browsers: Seq[Browser], 
                     devices: Seq[Device],
                     oss: Seq[Os])

case class Browser(family: String,
               major: Option[String] = None, 
               language: String

...
Run Code Online (Sandbox Code Playgroud)

如何将磁盘上的数据转换回这些案例类?

我需要能够选择多个列并展开它们,以便每个列表(例如browsers)的所有子列表都具有相同的长度。

例如,鉴于此原始数据:

Person(userId="1234",
  technographic=Some(Technographic(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en")),
    Browser(family=None, major=None, language=Some("en-us")),
    Browser(family=Some("Firefox), major=None, language=None)
  )),
  geographic=Some(Geographic(...))
)
Run Code Online (Sandbox Code Playgroud)

我需要,例如浏览器数据如下(以及能够选择所有列):

family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None
Run Code Online (Sandbox Code Playgroud)

如果 spark 可以explode每个列表项,我就可以得到。目前它只会做类似的事情(无论如何explode都不会处理多列):

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
Run Code Online (Sandbox Code Playgroud)

那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例类集)?

一种可能的方法是:

val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x => 
  ... // somehow zip `fields` with the values so that I can 
      // access values by column name instead of index 
      // (which is brittle), but how?
}
Run Code Online (Sandbox Code Playgroud)

Ber*_*ium 5

给定的

case class Browser(family: String,
                   major: Option[Int] = None,
                   language: String)

case class Tech(browsers: Seq[Browser],
                devices: Seq[String],
                oss: Seq[String])


case class Person(userId: String,
                  tech: Option[Tech] = None,
                  geographic: Option[String] = None)
Run Code Online (Sandbox Code Playgroud)

和一些方便的类型/功能 org.apache.spark.sql.Row

type A[E] = collection.mutable.WrappedArray[E]

implicit class RichRow(val r: Row) {
  def getOpt[T](n: String): Option[T] = {
    if (isNullAt(n)) {
      None
    } else {
      Some(r.getAs[T](n))
    }
  }

  def getStringOpt(n: String) = getOpt[String](n)
  def getString(n: String) = getStringOpt(n).get

  def getIntOpt(n: String) = getOpt[Int](n)
  def getInt(n: String) = r.getIntOpt(n).get

  def getArray[T](n: String) = r.getAs[A[T]](n)

  def getRow(n: String) = r.getAs[Row](n)
  def getRows(n: String) = r.getAs[A[Row]](n)

  def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}
Run Code Online (Sandbox Code Playgroud)

那么解析可以组织在一些函数中:

def toBrowser(r: Row): Browser = {
  Browser(
    r.getString("family"),
    r.getIntOpt("major"),
    r.getString("language"))
}

def toBrowsers(rows: A[Row]): Seq[Browser] = {
  rows.map(toBrowser)
}

def toTech(r: Row): Tech = {
  Tech(
    toBrowsers(r.getRows("browsers")),
    r.getArray[String]("devices"),
    r.getArray[String]("oss"))
}

def toTechOpt(r: Row): Option[Tech] = {
  Option(r).map(toTech)
}

def toPerson(r: Row): Person = {
  Person(
    r.getString("userId"),
    toTechOpt(r.getRow("tech")),
    r.getStringOpt("geographic"))
}
Run Code Online (Sandbox Code Playgroud)

所以你可以写

df.map(toPerson).collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)
  • 我已将解析函数组织为“独立”方法。我通常会将它们作为applycase 类的伴随对象或作为隐式值类Row。函数的原因是这样更容易粘贴到spark-shell

  • 每个解析函数直接处理普通列和数组,但在遇到集合时委托给另一个函数(Seq并且Option- 这些代表下一个嵌套级别)

  • implict class应该extend AnyVal,但这又不能粘贴到spark-shell