jbr*_*own 2 scala apache-spark apache-spark-sql
我有很多案例类,我在 spark 中使用它们将数据保存为镶木地板,例如:
case class Person(userId: String,
technographic: Option[Technographic] = None,
geographic: Option[Geographic] = None)
case class Technographic(browsers: Seq[Browser],
devices: Seq[Device],
oss: Seq[Os])
case class Browser(family: String,
major: Option[String] = None,
language: String
...
Run Code Online (Sandbox Code Playgroud)
如何将磁盘上的数据转换回这些案例类?
我需要能够选择多个列并展开它们,以便每个列表(例如browsers)的所有子列表都具有相同的长度。
例如,鉴于此原始数据:
Person(userId="1234",
technographic=Some(Technographic(browsers=Seq(
Browser(family=Some("IE"), major=Some(7), language=Some("en")),
Browser(family=None, major=None, language=Some("en-us")),
Browser(family=Some("Firefox), major=None, language=None)
)),
geographic=Some(Geographic(...))
)
Run Code Online (Sandbox Code Playgroud)
我需要,例如浏览器数据如下(以及能够选择所有列):
family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None
Run Code Online (Sandbox Code Playgroud)
如果 spark 可以explode每个列表项,我就可以得到。目前它只会做类似的事情(无论如何explode都不会处理多列):
browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
Run Code Online (Sandbox Code Playgroud)
那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例类集)?
一种可能的方法是:
val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x =>
... // somehow zip `fields` with the values so that I can
// access values by column name instead of index
// (which is brittle), but how?
}
Run Code Online (Sandbox Code Playgroud)
给定的
case class Browser(family: String,
major: Option[Int] = None,
language: String)
case class Tech(browsers: Seq[Browser],
devices: Seq[String],
oss: Seq[String])
case class Person(userId: String,
tech: Option[Tech] = None,
geographic: Option[String] = None)
Run Code Online (Sandbox Code Playgroud)
和一些方便的类型/功能 org.apache.spark.sql.Row
type A[E] = collection.mutable.WrappedArray[E]
implicit class RichRow(val r: Row) {
def getOpt[T](n: String): Option[T] = {
if (isNullAt(n)) {
None
} else {
Some(r.getAs[T](n))
}
}
def getStringOpt(n: String) = getOpt[String](n)
def getString(n: String) = getStringOpt(n).get
def getIntOpt(n: String) = getOpt[Int](n)
def getInt(n: String) = r.getIntOpt(n).get
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String) = r.getAs[A[Row]](n)
def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}
Run Code Online (Sandbox Code Playgroud)
那么解析可以组织在一些函数中:
def toBrowser(r: Row): Browser = {
Browser(
r.getString("family"),
r.getIntOpt("major"),
r.getString("language"))
}
def toBrowsers(rows: A[Row]): Seq[Browser] = {
rows.map(toBrowser)
}
def toTech(r: Row): Tech = {
Tech(
toBrowsers(r.getRows("browsers")),
r.getArray[String]("devices"),
r.getArray[String]("oss"))
}
def toTechOpt(r: Row): Option[Tech] = {
Option(r).map(toTech)
}
def toPerson(r: Row): Person = {
Person(
r.getString("userId"),
toTechOpt(r.getRow("tech")),
r.getStringOpt("geographic"))
}
Run Code Online (Sandbox Code Playgroud)
所以你可以写
df.map(toPerson).collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)
我已将解析函数组织为“独立”方法。我通常会将它们作为applycase 类的伴随对象或作为隐式值类Row。函数的原因是这样更容易粘贴到spark-shell
每个解析函数直接处理普通列和数组,但在遇到集合时委托给另一个函数(Seq并且Option- 这些代表下一个嵌套级别)
本implict class应该extend AnyVal,但这又不能粘贴到spark-shell
| 归档时间: |
|
| 查看次数: |
3431 次 |
| 最近记录: |