我正在尝试实现自定义UDT并能够从Spark SQL引用它(如Spark SQL白皮书的第4.4.2节中所述)。
真正的例子是使用Cap'n Proto或类似方法,使自定义UDT由堆外数据结构提供支持。
对于这篇文章,我做了一个人为的例子。我知道我可以只使用Scala案例类,而不必做任何工作,但这不是我的目标。
例如,我有一个Person包含多个属性,并且希望能够SELECT person.first_name FROM person。我遇到了错误Can't extract value from person#1,但不确定为什么。
这是完整的源代码(也可以从https://github.com/andygrove/spark-sql-udt获取)。
package com.theotherandygrove
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Example {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val schema = StructType(List(
StructField("person_id", DataTypes.IntegerType, true),
StructField("person", new MockPersonUDT, true)))
// load initial RDD
val rdd = sc.parallelize(List(
MockPersonImpl(1),
MockPersonImpl(2) …Run Code Online (Sandbox Code Playgroud)