小编Inv*_*ble的帖子

Flink 可重新扩展的键控流状态函数

我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数(MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
Run Code Online (Sandbox Code Playgroud)

MyRichMapFunction 是一个有状态函数,它扩展了 RichMapFunction,它具有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}
Run Code Online (Sandbox Code Playgroud)

将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新扩展的键控状态,以便在更改并行度后我可以将相应的缓存键控数据获取到其相应的任务槽。我试图探索这一点,我在这里找到了一个文档。据此,可以通过使用ListCheckPointed接口来实现可重新扩展的操作符状态,该接口为此提供了snapshotState/restoreState方法。但不确定如何实现可重新扩展的键控状态(MyRichMapFunction)?我是否需要为 MyRichMapFunction …

apache-flink flink-streaming

3
推荐指数
1
解决办法
572
查看次数

2022年Flink可以支持什么Java版本?

假设我开始一个新的 Flink Java 项目,如果我寻找“稳定的 Flink Java 生产体验”,我应该使用哪个版本?官方文档说它从 Flink 1.10 开始可以支持 Java-11,但很多用户仍在使用 Java-8,因此尝试了解我是否需要使用AdoptOpenJDK-8AdoptOpenJDK-11

java apache-flink

3
推荐指数
1
解决办法
6222
查看次数

AggregateFunction 中 merge 方法的含义

我试图理解 Flink 中的 AggregateFunction ,这里描述了这一点。它总共有四种方法,即

  1. 创建累加器
  2. 添加
  3. 获取结果
  4. 合并

据我了解,

createAccumulator当第一个元素进入新窗口时调用该方法,并且新创建的实例将被进一步使用

addcreateAccumulator调用方法以根据定义减少结果,这使用在方法中创建的实例

getResult当窗口关闭时调用方法并返回可用结果

我对上述方法的理解是否正确?最后,方法的用例是什么merge以及何时使用/调用它?这里的定义对我来说并不清楚。

apache-flink flink-streaming

2
推荐指数
1
解决办法
772
查看次数

标签 统计

apache-flink ×3

flink-streaming ×2

java ×1