Piy*_*ava 0 apache-flink flink-streaming
我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindows了Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)
这是中的代码TradeTrigger:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
Run Code Online (Sandbox Code Playgroud)
因此,基本上,当flag是时false,即第一次,Trigger应在20秒内触发并将设置flag为true。从下一次开始,应该每5秒触发一次。
我面临的问题是,每次Trigger触发时,我在输出中仅收到一条消息。也就是说,我在20秒后收到一条消息,每五秒收到一条消息。我希望每次触发时在输出中显示20条消息。
如果我使用.timeWindow(Time.seconds(5))并创建一个五秒钟的时间窗口,则每五秒钟输出20条消息。请帮助我正确编写此代码。我有什么想念的吗?
触发器实现存在一些问题:
您永远不要将函数的状态存储在静态变量中。Flink不会隔离JVM中的用户进程。而是每个TaskManager使用一个JVM,并启动多个线程。因此,您的静态布尔标志在触发器的多个实例之间共享。您应该存储Flink标志的ValueState界面,该界面可以从访问TriggerContext。Flink将谨慎检查您的状态,并在出现故障时恢复状态。
Trigger.onEvent()仅在有新事件到达时调用。因此,它不能用于在特定时间触发Window计算。相反,您应该注册一个事件时间计时器或处理时间计时器(再次通过TriggerContext)。计时器将分别呼叫Trigger.onEventTime()或Trigger.onProcessingTime()。使用事件还是处理时间取决于您的用例。
| 归档时间: |
|
| 查看次数: |
970 次 |
| 最近记录: |