如何在 Spark 代码中设置 Kryo 的不可修改集合序列化器

Poo*_*dar 4 kryo apache-spark

我在 Java 中的 Spark (v1.6.1) 中使用 Kryo 序列化,并且在序列化其字段中有集合的类时,它引发以下错误 -

Caused by: java.lang.UnsupportedOperationException
         at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:102)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
         ... 27 more
Run Code Online (Sandbox Code Playgroud)

我发现这是因为 Kryo 的默认 CollectionSerializer 无法反序列化集合,因为它不可修改,我们应该改用 UnmodifiableCollectionsSerializer。

我如何在 spark 代码中特别提到将 UnmodifiableCollectionsSerializer 用于 Kryo?

我目前的配置是 -

SparkConf conf = new SparkConf().setAppName("ABC");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class<?>[] {*list of classes I want to register*});
Run Code Online (Sandbox Code Playgroud)

Poo*_*dar 6

如果其他人遇到这个问题,这里是解决方案 - 我使用 javakaffee kryo 序列化程序让它工作。

添加以下 maven 依赖项:

<dependency>
        <groupId>de.javakaffee</groupId>
        <artifactId>kryo-serializers</artifactId>
        <version>0.42</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

编写自定义 kryo 注册器来注册 UnmodifiableCollectionsSerializer

    public class CustomKryoRegistrator implements KryoRegistrator {
        @Override
        public void registerClasses(Kryo kryo) {        
             UnmodifiableCollectionsSerializer.registerSerializers(kryo);
        }
   }
Run Code Online (Sandbox Code Playgroud)

将 spark.kryo.registrator 设置为自定义注册器的完全限定名称

conf.set("spark.kryo.registrator", "com.abc.CustomKryoRegistrator");
Run Code Online (Sandbox Code Playgroud)

参考 -

https://github.com/magro/kryo-serializers

Spark Kryo:注册自定义序列化程序