Ksp*_*ace 1 apache-flink flink-streaming
我在一个 java 类中定义某些变量,并使用不同的类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解该问题。
我面临的问题是这个过滤器功能不能很好地工作并且无法过滤独特的事件。我怀疑该变量在不同线程之间共享,这就是原因!?如果这不是正确的方法,请建议另一种方法。提前致谢。
**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();
**FilterClass.java**
public boolean filter(String val) throws Exception {
if(ClassWithVariables.uniqueMap.containsKey(key)) {
Arraylist<String> al = uniqueMap.get(key);
if(al.contains(val) {
return false;
} else {
//Update the hashmap list(uniqueMap)
return true;
}
} else {
//Add to hashmap list(uniqueMap)
return true;
}
}
Run Code Online (Sandbox Code Playgroud)
去重复流的正确方法包括按键对流进行分区,以便包含相同键的所有元素将由同一个工作器处理,并使用 flink 的托管、键控状态机制,以便状态具有容错性和可重新扩展。这是一个示例实现:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicate())
.print();
env.execute();
}
public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> seen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (seen.value() == null) {
out.collect(event);
seen.update(true);
}
}
}
Run Code Online (Sandbox Code Playgroud)
顺便说一句,这也可以实现为 RichFilterFunction。但请注意,如果您有无限的密钥空间,则正在使用的状态将无限增长,直到耗尽堆或磁盘上的空间,具体取决于您选择的 Flink 状态后端。如果这是一个问题,您可能需要通过State Time-to-Live设置状态保留策略。
另请注意,Flink 管道的不同部分之间不可能共享状态。与看似正常的情况相比,您需要将事情从内到外翻转,并将事件流带入状态,而不是获取它。
| 归档时间: |
|
| 查看次数: |
1857 次 |
| 最近记录: |