Kryo:反序列化旧版本的类

Ale*_*ssi 5 serialization scala kryo apache-spark spark-streaming

我需要通过添加两个新参数来修改类.这个类是用Kryo序列化的.每当我停止我的流时,我目前正在持续保存与此课程相关的信息,作为RDD.当我重新启动流时,我加载了以前持久化的信息,并使用它们在我停止和重新启动之间保持一致.

由于I类持久化需要这些新参数,我通过添加new kryo.writeObject(output, object, ObjectSerializer)和new参数来更改类和序列化器kryo.readObject(input, classOf[Object], ObjectSerializer).

现在,每当我重新启动流时,我都会获得一个异常:"遇到未注册的类......".

这似乎是显而易见的,因为我试图反序列化一个对象,这个对象在我停止流时我坚持的信息中没有包含.如果我删除这些数据并启动流,就好像它没有任何先前的运行一样,则不会发生异常.

有没有办法避免这种异常?也许通过指定一些默认值,以防这些参数丢失?

谢谢

编辑:

我找到了一些我以前没见过的有用的东西: Kryo问题194.

这个人通过简单地插入一个很长的定义他应该使用哪个版本的反序列化器来实现版本控制.这是一个简单的解决方案,但是,由于编写我正在编写的代码的公司没有考虑向前兼容性,我想我必须抛弃所有在新的序列化器之前保留的数据窗口.

如果有人能提出更好的解决方案,请告诉我.

编辑2:

仍然有这种情况的问题.我尝试使用此处描述的CompatibleFieldSerializer:CompatibleFieldSerializer示例 因此,通过注册此序列化程序而不是先前使用的自定义序列化程序.结果是,现在,当重新加载持久数据时,它给出了一个java.lang.NullPointerException.如果之前没有数据持续存在,仍然没有问题.我可以启动我的流,序列化新数据,停止流,反序列化并重新启动我的流.仍然没有解决方案的线索.

Ale*_*ssi 5

该问题的解决方案是在几个月前找到的。所以我想尽快发布这个问题的答案。问题在于,由于代码中的错误,该类使用标准的 Kryo FieldSerializer 进行了序列化,这不是向前兼容的。我们必须执行以下操作来反序列化旧类并将其转换为新的序列化类。

当时的情况是:

case class ClassA(field1 : Long, field2 : String)
Run Code Online (Sandbox Code Playgroud)

它是这样序列化的:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       classA( 
           field1 = input.readLong(),
           field2 = input.readLong()
       )
Run Code Online (Sandbox Code Playgroud)

并且循环包含要使用序列化器序列化的类的 Seq,以便为所有类注册所有序列化器。

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    final def register(kryo: Kryo) = {
         registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }
Run Code Online (Sandbox Code Playgroud)

该类需要通过添加一个新字段来修改,该字段是另一个案例类的实例。

为了执行此类更改,我们必须使用与 Kryo 库“可选”相关的注释,

...
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional
import scala.annotation.meta.field
...

case class ClassA(field1 : Long, field2 : String,  @(Optional @field)("field3") field3 : ClassB)
Run Code Online (Sandbox Code Playgroud)

序列化器被修改,例如在读取旧的序列化类时,它可以使用默认值实例化 field3,并在写入时写入这样的默认值:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
      kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer)

 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       ClassA( 
           field1 = input.readLong(),
           field2 = input.readLong(),
           field3 = ClassB.default
       )
Run Code Online (Sandbox Code Playgroud)

kryo 序列化程序注册也被修改为注册可选字段:

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    def optionals = Seq("field3")

    final def register(kryo: Kryo) = {
        optionals.foreach { optional =>
        kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) }
        registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }
Run Code Online (Sandbox Code Playgroud)

因此,我们能够编写新版本的序列化类。之后,我们不得不删除可选的注解,修改序列化器以从新的序列化类中读取真正的字段,并删除可选的序列化器注册并将其添加到注册表 Seq 中。

同时,我们更正了通过 FieldSerializer 强制序列化的代码中的错误,但这不在问题的范围内。