如何在Dataset中存储自定义对象?

zer*_*323 133 scala apache-spark apache-spark-dataset apache-spark-encoders

根据Spark数据集介绍:

正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.

并尝试将自定义类型存储为Dataset导致以下错误:

无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持

要么:

Java.lang.UnsupportedOperationException:找不到针对....的编码器

有没有现成的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.

Ale*_*lec 215

更新

这个答案仍然是有效的和翔实的,但现在情况更好,因为2.2/2.3,它加入了内置编码器的支持Set,Seq,Map,Date,Timestamp,和BigDecimal.如果你坚持只使用case类和通常的Scala类型来创建类型,那么你应该没有使用隐含的SQLImplicits.


不幸的是,几乎没有添加任何东西来帮助解决这个问题 在搜索@since 2.0.0Encoders.scala还是SQLImplicits.scala发现事情大多做基本类型(和案例类的一些调整).所以,首先要说的是:目前对自定义类编码器没有真正的好支持.有了这样的方式,接下来是一些技巧,我们能指望,因为我们目前有在我们的处置该做的好工作.作为前期免责声明:这将无法完美运行,我会尽力使所有限制清晰明确.

究竟是什么问题

当您想要创建数据集时,Spark"需要一个编码器(将类型T的JVM对象转换为内部Spark SQL表示形式),这通常是通过来自a的implicit自动创建的SparkSession,或者可以通过调用静态方法显式创建on Encoders"(取自文档createDataset).编码器将采取的形式Encoder[T],其中T是要在编码类型.第一个建议是添加import spark.implicits._(给你这些隐式编码器),第二个建议是使用组编码器相关函数显式传入隐式编码器.

普通班级没有编码器,所以

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Run Code Online (Sandbox Code Playgroud)

将为您提供以下隐式相关编译时错误:

无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持

但是,如果你在某些扩展的类中包装你刚才用来获取上述错误的任何类型,那么Product这个错误会被混淆地延迟到运行时,所以

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Run Code Online (Sandbox Code Playgroud)

编译得很好,但在运行时失败了

java.lang.UnsupportedOperationException:找不到MyObj的编码器

这样做的原因是Spark使用implicits创建的编码器实际上只在运行时创建(通过scala relfection).在这种情况下,编译时的所有Spark检查都是最外层的类扩展Product(所有案例类都这样做),并且只在运行时意识到它仍然不知道该怎么做MyObj(如果我试图制作同样的问题) a Dataset[(Int,MyObj)]- Spark等到运行时到barf MyObj).这些是迫切需要修复的核心问题:

  • 一些扩展Product编译的类,尽管总是在运行时崩溃
  • 没有办法为嵌套类型传递自定义编码器(我无法为其提供Spark编码器,MyObj以便它知道如何编码Wrap[MyObj](Int,MyObj)).

只是用 kryo

每个人都建议的解决方案是使用kryo编码器.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Run Code Online (Sandbox Code Playgroud)

尽管如此,这变得相当繁琐.特别是如果你的代码正在操纵各种数据集,加入,分组等等.你最终会产生一些额外的暗示.那么,为什么不做一个隐含的自动完成这一切呢?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)
Run Code Online (Sandbox Code Playgroud)

现在,似乎我几乎可以做任何我想做的事情(下面的示例在自动导入的spark-shell地方不起作用spark.implicits._)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Run Code Online (Sandbox Code Playgroud)

或差不多.问题是使用kryoSpark导致只将数据集中的每一行存储为平面二进制对象.对于map,filter,foreach那就足够了,但对于像操作join,星火真的需要这些被分隔成列.检查架构d2或者d3,你看到的只有一个二进制列:

d2.printSchema
// root
//  |-- value: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)

元组的部分解决方案

因此,使用Scala中隐含的魔力(6.26.3重载分辨率中的更多内容),我可以使自己成为一系列的瑕疵,尽可能做好工作,至少对于元组来说,并且可以很好地处理现有的含义:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these
Run Code Online (Sandbox Code Playgroud)

然后,有了这些暗示,我可以让我的例子在上面工作,尽管有一些列重命名

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Run Code Online (Sandbox Code Playgroud)

我还没有弄清楚如何在没有重命名的情况下获得预期的元组名称(_1,, _2...) - 如果其他人想要玩这个,就是"value"引入名称的地方,就是元组的位置通常会添加名称.但是,关键是我现在有一个很好的结构化架构:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)

总而言之,这个解决方法:

  • 允许我们为元组获取单独的列(所以我们可以再次加入元组,是的!)
  • 我们可以再次依赖于暗示(所以不需要在kryo所有地方传递)
  • 几乎完全向后兼容import spark.implicits._(涉及一些重命名)
  • 没有让我们对加入kyro系列化二进制列,更不用说对这些领域有可能
  • 将一些元组列重命名为"value"会产生令人不快的副作用(如果需要,可以通过转换.toDF,指定新列名称以及转换回数据集来撤消这些副作用- 并且模式名称似乎通过连接保留,他们最需要的地方).

一般的类的部分解决方案

这个不太愉快,没有好的解决方案.但是,现在我们已经有了上面的元组解决方案,我有一个预感,另一个答案的隐式转换解决方案也会有点痛苦,因为你可以将更复杂的类转换为元组.然后,在创建数据集之后,您可能会使用数据框方法重命名列.如果一切顺利,这实际上是一个改进,因为我现在可以在我的类的字段上执行连接.如果我刚刚使用了一个kryo不可能的平面二进制序列化器.

这里是做了一切位的例子:我有一个类MyObj,其具有的类型的字段Int,java.util.UUID以及Set[String].第一个照顾自己.第二个,虽然我可以序列化使用kryo,如果存储为a会更有用String(因为UUIDs通常是我想要加入的东西).第三个真正属于二进制列.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Run Code Online (Sandbox Code Playgroud)

现在,我可以使用这个机制创建一个具有良好模式的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Run Code Online (Sandbox Code Playgroud)

模式向我展示了具有正确名称的列和前两个我可以加入的对象.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)


zer*_*323 28

  1. 使用通用编码器.

    有现在两个可用通用的编码器kryojavaSerialization其中后者明确地描述为:

    效率极低,应该只作为最后的手段.

    假设下课

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    
    Run Code Online (Sandbox Code Playgroud)

    您可以通过添加隐式编码器来使用这些编码器:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    
    Run Code Online (Sandbox Code Playgroud)

    可以一起使用如下:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

    它将对象存储为binary列,因此在转换为DataFrame您时,您将获得以下架构:

    root
     |-- value: binary (nullable = true)
    
    Run Code Online (Sandbox Code Playgroud)

    也可以使用kryo编码器为特定字段编码元组:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    
    Run Code Online (Sandbox Code Playgroud)

    请注意,我们不依赖于隐式编码器,而是明确地传递编码器,因此这很可能不适用于toDS方法.

  2. 使用隐式转换:

    提供可编码的表示与自定义类之间的隐式转换,例如:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

相关问题:


Cho*_*ack 7

您可以使用UDTRegistration,然后使用Case Classes,Tuples等...所有这些都可以正常使用您的用户定义类型!

假设您要使用自定义枚举:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}
Run Code Online (Sandbox Code Playgroud)

像这样注册:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
Run Code Online (Sandbox Code Playgroud)

然后用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())
Run Code Online (Sandbox Code Playgroud)

假设您要使用多态记录:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly
Run Code Online (Sandbox Code Playgroud)

......并且像这样使用它:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
Run Code Online (Sandbox Code Playgroud)

您可以编写一个自定义UDT,将所有内容编码为字节(我在这里使用java序列化,但最好是使用Spark的Kryo上下文).

首先定义UDT类:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}
Run Code Online (Sandbox Code Playgroud)

然后注册:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
Run Code Online (Sandbox Code Playgroud)

那你就可以用了!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
Run Code Online (Sandbox Code Playgroud)


Sar*_*ngh 6

编码器的工作方式大致相同Spark2.0.并且Kryo仍然是推荐的serialization选择.

您可以使用spark-shell查看以下示例

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
Run Code Online (Sandbox Code Playgroud)

到目前为止,appropriate encoders在目前的范围内没有,所以我们的人没有被编码为binary价值观.但是,一旦我们implicit使用Kryo序列化提供一些编码器,这将会改变.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
Run Code Online (Sandbox Code Playgroud)

  • 当我们执行“.show”时,如何在使用编码器后转换回正常的非二进制值? (2认同)