Flink计时器未按时执行

Asa*_*lik 1 java apache-flink flink-streaming

这是一个后续问题:状态到期时触发

我在流中存储每个传入元素的状态,在计时器关闭后,我删除状态。这样我就可以防止处理重复项,直到元素超时,然后我可以再次处理相同的元素。一世

我编写了以下代码来测试计时器,但似乎在所有 3 个元素都通过第一个ProcessFunction.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    streamEnv.setParallelism(12);

    List<Tuple2<String, String>> inputList = new ArrayList<>();
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));

    streamEnv.fromCollection(inputList).keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Integer> occur;

                @Override
                public void open(Configuration parameters) throws Exception {
                    occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (occur.value() < 2) {
                        occur.update(occur.value() + 1);
                        out.collect(value);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                    }
                    else {
                        Thread.sleep(10000);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                        out.collect(value);
                    }
                }
            })
            .keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Tuple2<String, String>> storedTuple;

                @Override
                public void open(Configuration parameters) throws Exception {
                    storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple",
                            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    Tuple2<String, String> stored = storedTuple.value();
                    if (stored == null) {
                        LOGGER.info("[TEST] Storing Tuple {}", value);
                        storedTuple.update(value);
                        out.collect(value);
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000);
                    }
                }
            }

            @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value());
                    storedTuple.clear();
                }    
            )
            .addSink(new CollectSink());

    streamEnv.execute("Testing");
    for (Tuple2<String, String> tup: CollectSink.values) {
        System.out.println(tup);
    }

}

private static class CollectSink implements SinkFunction<Tuple2<String, String>> {

    static final List<Tuple2<String, String>> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Tuple2<String, String> value) throws Exception {
        values.add(value);
    }
}
Run Code Online (Sandbox Code Playgroud)

我有一个包含 3 个重复元素的输入列表。在第一个中,ProcessFunction我按原样发送前两个元素,但将第三个元素延迟 10 秒。

在第二个中,ProcessFunction它根据是否为其存储状态来过滤元素。正如预期的那样,第一个元素被存储并继续发送,第二个元素不是因为状态已经存在。对于第一个元素,除了继续发送之外,我还设置了一个 6 秒的计时器,以便在触发计时器后清除状态。

现在第三个元素在 10 秒后发送,这意味着 6 秒触发器应该已经清除了状态。然而,第三个元素也在定时器触发之前被处理。我也可以看到输出只包含 1 个元组副本,即使我预计有 2 个副本。

我添加了一些日志来更好地了解执行时间。

[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Storing Tuple (Test,test)
[2019-02-19 14:11:58,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Removing Tuple (Test,test)
Run Code Online (Sandbox Code Playgroud)

您可以看到前两个元组按预期一起发出,然后延迟 10 秒,然后发出第三个元组。现在Removing Tuple10 秒后发生,即使它在第一个元组进入的 6 秒后被触发。

Dav*_*son 6

事件时间计时器在处理大于计时器中指定时间的水印之前不会触发。直到处理完第三个事件之后,才会出现这样的水印。此外,随着摄取时间,水印是使用周期性水印生成器生成的,默认情况下每 200 毫秒插入一次流中。

  • 我刚刚尝试了`ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 6000);`,现在工作按预期运行。我不完全确定事件时间计时器和水印发生了什么。 (3认同)
  • 当有限源到达其末尾时,将创建值为 MAX_WATERMARK 的水印并通过作业图发送,该图触发所有事件时间计时器。 (2认同)