Spark Kryo:注册自定义序列化程序

mar*_*ios 11 scala kryo apache-spark

我有一个类通过实现read()write()方法实现自定义Kryo序列化器com.esotericsoftware.kryo.Serializer(参见下面的示例).如何在Spark中注册此自定义序列化程序?

这是我所拥有的伪代码示例:

class A() 

CustomASerializer extends com.esotericsoftware.kryo.Serializer[A]{
    override def write(kryo: Kryo, output: Output, a: A): Unit = ???
    override def read(kryo: Kryo, input: Input, t: Class[A]): A = ???
}

val kryo: Kryo = ... 
kryo.register(classOf[A], new CustomASerializer()); // I can register my serializer
Run Code Online (Sandbox Code Playgroud)

现在在Spark:

val sparkConf = new SparkConf()
sparkConf.registerKryoClasses(Array(classOf[A]))
Run Code Online (Sandbox Code Playgroud)

不幸的是,Spark没有给我选择注册我的自定义序列化程序.知道是否有办法做到这一点?

Tza*_*har 16

KryoRegistrator使用此自定义序列化程序注册创建您自己的:

package com.acme

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[A], new CustomASerializer())
  } 
}
Run Code Online (Sandbox Code Playgroud)

然后,设置spark.kryo.registrator为registrator的完全限定名称,例如com.acme.MyRegistrator:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "com.acme.KryoRegistrator")
Run Code Online (Sandbox Code Playgroud)

  • 这在spark文档中不是很清楚,但这绝对可行。如果您遇到Kryo无法在Spark中使用无参数构造函数序列化类的问题(对我来说是org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering),那么可以解决此问题通过使用kryo.register(LazilyGeneratedOrdering.class,new JavaSerializer()); 谢谢! (3认同)