zer*_*323 133 scala apache-spark apache-spark-dataset apache-spark-encoders
根据Spark数据集介绍:
正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.
并尝试将自定义类型存储为Dataset
导致以下错误:
无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持
要么:
Java.lang.UnsupportedOperationException:找不到针对....的编码器
有没有现成的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.
Ale*_*lec 215
这个答案仍然是有效的和翔实的,但现在情况更好,因为2.2/2.3,它加入了内置编码器的支持Set
,Seq
,Map
,Date
,Timestamp
,和BigDecimal
.如果你坚持只使用case类和通常的Scala类型来创建类型,那么你应该没有使用隐含的SQLImplicits
.
不幸的是,几乎没有添加任何东西来帮助解决这个问题 在搜索@since 2.0.0
中Encoders.scala
还是SQLImplicits.scala
发现事情大多做基本类型(和案例类的一些调整).所以,首先要说的是:目前对自定义类编码器没有真正的好支持.有了这样的方式,接下来是一些技巧,我们能指望,因为我们目前有在我们的处置该做的好工作.作为前期免责声明:这将无法完美运行,我会尽力使所有限制清晰明确.
当您想要创建数据集时,Spark"需要一个编码器(将类型T的JVM对象转换为内部Spark SQL表示形式),这通常是通过来自a的implicit自动创建的SparkSession
,或者可以通过调用静态方法显式创建on Encoders
"(取自文档createDataset
).编码器将采取的形式Encoder[T]
,其中T
是要在编码类型.第一个建议是添加import spark.implicits._
(给你这些隐式编码器),第二个建议是使用这组编码器相关函数显式传入隐式编码器.
普通班级没有编码器,所以
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Run Code Online (Sandbox Code Playgroud)
将为您提供以下隐式相关编译时错误:
无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持
但是,如果你在某些扩展的类中包装你刚才用来获取上述错误的任何类型,那么Product
这个错误会被混淆地延迟到运行时,所以
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Run Code Online (Sandbox Code Playgroud)
编译得很好,但在运行时失败了
java.lang.UnsupportedOperationException:找不到MyObj的编码器
这样做的原因是Spark使用implicits创建的编码器实际上只在运行时创建(通过scala relfection).在这种情况下,编译时的所有Spark检查都是最外层的类扩展Product
(所有案例类都这样做),并且只在运行时意识到它仍然不知道该怎么做MyObj
(如果我试图制作同样的问题) a Dataset[(Int,MyObj)]
- Spark等到运行时到barf MyObj
).这些是迫切需要修复的核心问题:
Product
编译的类,尽管总是在运行时崩溃MyObj
以便它知道如何编码Wrap[MyObj]
或(Int,MyObj)
).kryo
每个人都建议的解决方案是使用kryo
编码器.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Run Code Online (Sandbox Code Playgroud)
尽管如此,这变得相当繁琐.特别是如果你的代码正在操纵各种数据集,加入,分组等等.你最终会产生一些额外的暗示.那么,为什么不做一个隐含的自动完成这一切呢?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
Run Code Online (Sandbox Code Playgroud)
现在,似乎我几乎可以做任何我想做的事情(下面的示例在自动导入的spark-shell
地方不起作用spark.implicits._
)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Run Code Online (Sandbox Code Playgroud)
或差不多.问题是使用kryo
Spark导致只将数据集中的每一行存储为平面二进制对象.对于map
,filter
,foreach
那就足够了,但对于像操作join
,星火真的需要这些被分隔成列.检查架构d2
或者d3
,你看到的只有一个二进制列:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)
因此,使用Scala中隐含的魔力(6.26.3重载分辨率中的更多内容),我可以使自己成为一系列的瑕疵,尽可能做好工作,至少对于元组来说,并且可以很好地处理现有的含义:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Run Code Online (Sandbox Code Playgroud)
然后,有了这些暗示,我可以让我的例子在上面工作,尽管有一些列重命名
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Run Code Online (Sandbox Code Playgroud)
我还没有弄清楚如何在没有重命名的情况下获得预期的元组名称(_1
,, _2
...) - 如果其他人想要玩这个,这就是"value"
引入名称的地方,这就是元组的位置通常会添加名称.但是,关键是我现在有一个很好的结构化架构:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)
总而言之,这个解决方法:
kryo
所有地方传递)import spark.implicits._
(涉及一些重命名)kyro
系列化二进制列,更不用说对这些领域有可能.toDF
,指定新列名称以及转换回数据集来撤消这些副作用- 并且模式名称似乎通过连接保留,他们最需要的地方).这个不太愉快,没有好的解决方案.但是,现在我们已经有了上面的元组解决方案,我有一个预感,另一个答案的隐式转换解决方案也会有点痛苦,因为你可以将更复杂的类转换为元组.然后,在创建数据集之后,您可能会使用数据框方法重命名列.如果一切顺利,这实际上是一个改进,因为我现在可以在我的类的字段上执行连接.如果我刚刚使用了一个kryo
不可能的平面二进制序列化器.
这里是做了一切位的例子:我有一个类MyObj
,其具有的类型的字段Int
,java.util.UUID
以及Set[String]
.第一个照顾自己.第二个,虽然我可以序列化使用kryo
,如果存储为a会更有用String
(因为UUID
s通常是我想要加入的东西).第三个真正属于二进制列.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Run Code Online (Sandbox Code Playgroud)
现在,我可以使用这个机制创建一个具有良好模式的数据集:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Run Code Online (Sandbox Code Playgroud)
模式向我展示了具有正确名称的列和前两个我可以加入的对象.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)
zer*_*323 28
使用通用编码器.
有现在两个可用通用的编码器kryo
和javaSerialization
其中后者明确地描述为:
效率极低,应该只作为最后的手段.
假设下课
class Bar(i: Int) {
override def toString = s"bar $i"
def bar = i
}
Run Code Online (Sandbox Code Playgroud)
您可以通过添加隐式编码器来使用这些编码器:
object BarEncoders {
implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] =
org.apache.spark.sql.Encoders.kryo[Bar]
}
Run Code Online (Sandbox Code Playgroud)
可以一起使用如下:
object Main {
def main(args: Array[String]) {
val sc = new SparkContext("local", "test", new SparkConf())
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
import BarEncoders._
val ds = Seq(new Bar(1)).toDS
ds.show
sc.stop()
}
}
Run Code Online (Sandbox Code Playgroud)
它将对象存储为binary
列,因此在转换为DataFrame
您时,您将获得以下架构:
root
|-- value: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)
也可以使用kryo
编码器为特定字段编码元组:
val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
// org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
Run Code Online (Sandbox Code Playgroud)
请注意,我们不依赖于隐式编码器,而是明确地传递编码器,因此这很可能不适用于toDS
方法.
使用隐式转换:
提供可编码的表示与自定义类之间的隐式转换,例如:
object BarConversions {
implicit def toInt(bar: Bar): Int = bar.bar
implicit def toBar(i: Int): Bar = new Bar(i)
}
object Main {
def main(args: Array[String]) {
val sc = new SparkContext("local", "test", new SparkConf())
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
import BarConversions._
type EncodedBar = Int
val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1)))
val barsDS = bars.toDS
barsDS.show
barsDS.map(_.bar).show
sc.stop()
}
}
Run Code Online (Sandbox Code Playgroud)相关问题:
您可以使用UDTRegistration,然后使用Case Classes,Tuples等...所有这些都可以正常使用您的用户定义类型!
假设您要使用自定义枚举:
trait CustomEnum { def value:String }
case object Foo extends CustomEnum { val value = "F" }
case object Bar extends CustomEnum { val value = "B" }
object CustomEnum {
def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}
Run Code Online (Sandbox Code Playgroud)
像这样注册:
// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
override def sqlType: DataType = org.apache.spark.sql.types.StringType
override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
// Note that this will be a UTF8String type
override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}
// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
Run Code Online (Sandbox Code Playgroud)
然后用它!
case class UsingCustomEnum(id:Int, en:CustomEnum)
val seq = Seq(
UsingCustomEnum(1, Foo),
UsingCustomEnum(2, Bar),
UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())
Run Code Online (Sandbox Code Playgroud)
假设您要使用多态记录:
trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly
Run Code Online (Sandbox Code Playgroud)
......并且像这样使用它:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Run Code Online (Sandbox Code Playgroud)
您可以编写一个自定义UDT,将所有内容编码为字节(我在这里使用java序列化,但最好是使用Spark的Kryo上下文).
首先定义UDT类:
class CustomPolyUDT extends UserDefinedType[CustomPoly] {
val kryo = new Kryo()
override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
override def serialize(obj: CustomPoly): Any = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
bos.toByteArray
}
override def deserialize(datum: Any): CustomPoly = {
val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val ois = new ObjectInputStream(bis)
val obj = ois.readObject()
obj.asInstanceOf[CustomPoly]
}
override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}
Run Code Online (Sandbox Code Playgroud)
然后注册:
// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
Run Code Online (Sandbox Code Playgroud)
那你就可以用了!
// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Run Code Online (Sandbox Code Playgroud)
编码器的工作方式大致相同Spark2.0
.并且Kryo
仍然是推荐的serialization
选择.
您可以使用spark-shell查看以下示例
scala> import spark.implicits._
import spark.implicits._
scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders
scala> case class NormalPerson(name: String, age: Int) {
| def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class NormalPerson
scala> case class ReversePerson(name: Int, age: String) {
| def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class ReversePerson
scala> val normalPersons = Seq(
| NormalPerson("Superman", 25),
| NormalPerson("Spiderman", 17),
| NormalPerson("Ironman", 29)
| )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))
scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]
scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
scala> ds1.show()
+---------+---+
| name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
| Ironman| 29|
+---------+---+
scala> ds2.show()
+----+---------+
|name| age|
+----+---------+
| 25| Superman|
| 17|Spiderman|
| 29| Ironman|
+----+---------+
scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.
scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
Run Code Online (Sandbox Code Playgroud)
到目前为止,appropriate encoders
在目前的范围内没有,所以我们的人没有被编码为binary
价值观.但是,一旦我们implicit
使用Kryo
序列化提供一些编码器,这将会改变.
// Provide Encoders
scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]
scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]
// Ecoders will be used since they are now present in Scope
scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]
scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]
// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+
scala> ds4.show()
+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+
// Our instances still work as expected
scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.
scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
Run Code Online (Sandbox Code Playgroud)