我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数(MapState),
environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
Run Code Online (Sandbox Code Playgroud)
MyRichMapFunction 是一个有状态函数,它扩展了 RichMapFunction,它具有以下代码,
public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
private transient MapState<String, Boolean> cache;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, Boolean> descriptor =
new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
cache = getRuntimeContext().getMapState(descriptor);
}
@Override
public MyEvent map(MyEvent value) throws Exception {
if (cache.contains(value.getEventId())) {
value.setIsSeenAlready(Boolean.TRUE);
return value;
}
value.setIsSeenAlready(Boolean.FALSE);
cache.put(value.getEventId(), Boolean.TRUE)
return value;
}
}
Run Code Online (Sandbox Code Playgroud)
将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新扩展的键控状态,以便在更改并行度后我可以将相应的缓存键控数据获取到其相应的任务槽。我试图探索这一点,我在这里找到了一个文档。据此,可以通过使用ListCheckPointed接口来实现可重新扩展的操作符状态,该接口为此提供了snapshotState/restoreState方法。但不确定如何实现可重新扩展的键控状态(MyRichMapFunction)?我是否需要为 MyRichMapFunction …
假设我开始一个新的 Flink Java 项目,如果我寻找“稳定的 Flink Java 生产体验”,我应该使用哪个版本?官方文档说它从 Flink 1.10 开始可以支持 Java-11,但很多用户仍在使用 Java-8,因此尝试了解我是否需要使用AdoptOpenJDK-8或AdoptOpenJDK-11。