java.util.List 和 java.util.Map 的 Flink 序列化

Ale*_*uss 2 java apache-flink

我的 Flink 管道目前使用包含一些列表和映射(字符串)的 Pojo,沿着

public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

Run Code Online (Sandbox Code Playgroud)

出于性能原因,我想绕过 Kryo 序列化,所以我禁用了通用回退,env.getConfig().disableGenericTypes();Flink 文档中所述

现在,Flink 抱怨列表:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    ...
Run Code Online (Sandbox Code Playgroud)

在 Flink 中序列化此类简单列表和映射的首选方法是什么?在内部,这些目前是ArrayListand HashMap,但其他实现也可以。org.apache.flink.api.common.typeutils.base.ListSerializerFlink中好像有一个类,但是不知道怎么用。

Mar*_*nas 7

如果你这样做:

\n\n
env.getConfig().disableGenericTypes();\n
Run Code Online (Sandbox Code Playgroud)\n\n

每当遇到要通过 Kryo 的数据类型时,它都会引发异常。

\n\n

因此,在这种情况下,您必须编写自己的序列化器。\n可以使用 来创建序列化器TypeSerializer,只需调用typeInfo.createSerializer(config)TypeInformation 对象即可。

\n\n

对于泛型类型,您需要通过 TypeHint \xe2\x80\x9ccapture\xe2\x80\x9d 泛型类型信息,在您的情况下为列表:

\n\n
TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});\n
Run Code Online (Sandbox Code Playgroud)\n\n

列表类型信息类

\n\n

更多详细信息请参见此处

\n


Arv*_*ise 6

Marius 已经很好地解释了原因,尽管我不明白 Flink 不支持开箱即用的用例的原因。不过,我将添加现在有效的解决方案。

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
    ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
    ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));

// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");

final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);

DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));
Run Code Online (Sandbox Code Playgroud)

请注意,测试代码中可怕的访问模式只是为了快速而肮脏的演示。