有人可以正确解释事件时间戳和水印。我从文档中理解它,但不是很清楚。现实生活中的例子或外行定义会有所帮助。另外,如果可以的话,请举一个例子(连同一些可以解释它的代码片段)。提前致谢
我想ProcessWindowFunction在我的Apache Flink项目中使用。但是使用过程函数时出现一些错误,请参见下面的代码片段
错误是:
在类型WindowedStream,元组,TimeWindow>的方法处理(ProcessWindowFunction,R,元组,TimeWindow>)是不适用的参数(JDBCExample.MyProcessWindows)
我的程序:
DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new MyProcessWindows());
Run Code Online (Sandbox Code Playgroud)
我的ProcessWindowFunction:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
public void process(
String key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception
{
...
}
}
Run Code Online (Sandbox Code Playgroud) 我在一个 java 类中定义某些变量,并使用不同的类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解该问题。
我面临的问题是这个过滤器功能不能很好地工作并且无法过滤独特的事件。我怀疑该变量在不同线程之间共享,这就是原因!?如果这不是正确的方法,请建议另一种方法。提前致谢。
**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();
**FilterClass.java**
public boolean filter(String val) throws Exception {
if(ClassWithVariables.uniqueMap.containsKey(key)) {
Arraylist<String> al = uniqueMap.get(key);
if(al.contains(val) {
return false;
} else {
//Update the hashmap list(uniqueMap)
return true;
}
} else {
//Add to hashmap list(uniqueMap)
return true;
}
}
Run Code Online (Sandbox Code Playgroud)