Flink 可重新扩展的键控流状态函数

Inv*_*ble 3 apache-flink flink-streaming

我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数(MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
Run Code Online (Sandbox Code Playgroud)

MyRichMapFunction 是一个有状态函数,它扩展了 RichMapFunction,它具有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}
Run Code Online (Sandbox Code Playgroud)

将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新扩展的键控状态,以便在更改并行度后我可以将相应的缓存键控数据获取到其相应的任务槽。我试图探索这一点,我在这里找到了一个文档。据此,可以通过使用ListCheckPointed接口来实现可重新扩展的操作符状态,该接口为此提供了snapshotState/restoreState方法。但不确定如何实现可重新扩展的键控状态(MyRichMapFunction)?我是否需要为 MyRichMapFunction 类实现 ListCheckPointed 接口?如果是,我如何根据restoreState方法上的新并行密钥散列重新分配缓存(我的MapState将在启用TTL的情况下保存大量密钥,假设它在任何时间点最多将保存10亿个密钥)?有人可以帮助我解决这个问题吗?或者如果你给我指出任何一个也很棒的例子。

Dav*_*son 5

您编写的代码已经可以重新扩展;Flink 的托管键控状态在设计上是可重新扩展的。通过重新平衡实例的键分配来重新调整键控状态。(您可以将键控状态视为分片键/值存储。从技术上讲,发生的情况是使用一致散列将键映射到键组,并且每个并行实例负责某些键组。重新缩放只涉及重新分配键实例之间的组。)

ListCheckpointed接口用于非键控上下文中使用的状态,因此它不适合您正在做的事情。另请注意,ListCheckpointed在 Flink 1.11 中将不再推荐使用更通用的CheckpointedFunction.

另一件事:如果MyKeyExtractor是通过 进行键控value.getEventId(),那么您可以将ValueState<Boolean>其用于缓存,而不是MapState<String, Boolean>。这是有效的,因为对于键控状态,每个键都有一个单独的 ValueState 值。仅当您需要为流中的每个键存储多个属性/值对时,才需要使用 MapState。

其中大部分内容在 Flink 文档的实践培训中进行了讨论,其中包括一个与您正在做的事情非常接近的示例。