Spark SQL - 如何为自定义对象编写自定义编码器,并将 Map[String, Any] 作为其字段?

Aje*_*eet 5 hadoop scala apache-spark apache-spark-sql

我正在尝试将 CSV 加载为自定义对象类型的 DataFrame:

case class Geom(attributes: Map[String, Any])
Run Code Online (Sandbox Code Playgroud)

我试过这个:

import session.implicits._

implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val geomEncoder = org.apache.spark.sql.Encoders.product[Geom]

val sparkSQLGeometryRDD = session.read
  .option("delimiter", "\t")
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("src\\main\\resources\\TexasPostCodes.txt")
  //.as[MyObjEncoded]//(encoder)
  .persist()

val columns = sparkSQLGeometryRDD.schema.fieldNames
//sparkSQLGeometryRDD.show()

val mappedDF = sparkSQLGeometryRDD
  .map(x => x.getValuesMap[Any](columns.toList))
  .map(x => Geom(x))
  .show
Run Code Online (Sandbox Code Playgroud)

但它抛出了这个异常:

Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
Run Code Online (Sandbox Code Playgroud)

有人可以帮我找出我的代码有什么问题吗?

将案例类和编码器从方法中移出后,它工作正常。

object SpatialEncoders {
   implicit def MapEncoder: Encoder[Map[String, Any]]=     Encoders.kryo[Map[String, Any]]
   implicit def GeomEncoder: Encoder[Geom] = Encoders.kryo[Geom]
   implicit def SparkSQLGeometryEncoder: Encoder[SparkSQLGeometry]=      Encoders.kryo[SparkSQLGeometry]
}
Run Code Online (Sandbox Code Playgroud)