Apache Flink 1.0.0 。事件时间相关的迁移问题

Ale*_*mir 5 apache-flink flink-streaming

我曾尝试将一些简单的 Task 迁移到 Flink 1.0.0 版本,但由于以下异常而失败:

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
Run Code Online (Sandbox Code Playgroud)

该代码由两个通过 Kafka 主题连接的独立任务组成,其中一个任务是简单的消息生成器,另一个任务是简单的消息消费者,它使用 timeWindowAll 来计算每分钟的消息到达率。

同样,类似的代码在 0.10.2 版本上运行没有任何问题,但现在看起来系统错误地解释了一些事件时间戳,如 Long.MIN_VALUE,这会导致任务失败。

问题是我有什么问题,或者是一些将在未来版本中修复的错误?

主要任务:

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
Run Code Online (Sandbox Code Playgroud)

时间戳提取器:

public class Test1_0_0 {
    // Max Time lag between events time to current System time
    static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS);

    public static void main(String[] args) throws Exception {
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        // Setting Event Time usage
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setBufferTimeout(1);
        // Properties initialization
        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        // Overwrites the default properties by one provided by command line
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) {
            properties.setProperty(e.getKey(),e.getValue());
        }
        //properties.setProperty("auto.offset.reset", "smallest");
        System.out.println("Properties: " + properties);
        DataStream<Message> stream = env
        .addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null);
        // The call to rebalance() causes data to be re-partitioned so that all machines receive messages
        // (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances).
        stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
        // Counts messages
        stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
                Integer count = 0;
                if (values.iterator().hasNext()) {
                    for (Message value : values) {
                        count++;
                    }
                    collector.collect("Arrived last minute: " + count);
                }
            }
        }).print();
        // execute program
        env.execute("Messages Counting");
    }
 }
Run Code Online (Sandbox Code Playgroud)

Til*_*ann 3

问题是调用assignTimestampsAndWatermarks返回一个DataStream使用时间戳提取器的新值。因此,您必须使用返回的值DataStream来对其执行后续操作。

DataStream<Message> timestampStream = stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts Strings
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
        Integer count = 0;
        if (values.iterator().hasNext()) {
            for (Message value : values) {
                count++;
            }
            collector.collect("Arrived last minute: " + count);
        }
    }
}).print();
Run Code Online (Sandbox Code Playgroud)