Flink - 如何使用 withTimestampAssigner 从事件负载中获取时间(不使用 Kafka 时间戳)

Sim*_*Azz 4 apache-kafka flink-streaming

我试图了解如何在 Kafka Source 的WatermarkStrategy中使用withTimestampAssigner()。我需要使用的“时间”位于消息有效负载内。

为此,我有以下代码:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
        WatermarkStrategy
        .forMonotonousTimestamps()
                .withTimestampAssigner(Event, Event.time))

DataStream<Event> stream = env.addSource(kafkaData);
Run Code Online (Sandbox Code Playgroud)

其中 EventDeserializationSchema() 是这样的:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;
    
    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        
        return TypeInformation.of(Event.class);
    }
}
Run Code Online (Sandbox Code Playgroud)

和事件:

import java.io.Serializable;

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}
Run Code Online (Sandbox Code Playgroud)

我想了解的是如何向withTimeStampAssigner()提供时间

.withTimestampAssigner(???))
Run Code Online (Sandbox Code Playgroud)

该变量应该是Event.time但从 flink 页面我不太明白。

在此输入图像描述

我一直在寻找

在此输入图像描述

这让我有点困惑,因为我不明白就我的情况而言,解决方案是否非常简单,或者我需要额外的背景信息。我发现的所有示例都使用 .forBoundedOutOfOrderness() 或 flink 的早期版本,其中实现方式有所不同,如下所示:

kafka flink timestamp 事件时间和水印

谢谢!

Dav*_*son 5

如果源(例如,FlinkKafkaConsumer)没有提供您想要使用的时间戳,那么您需要提供TimestampAssigner. 这是一个函数,它将事件和先前分配的时间戳(如果有)作为输入,并返回时间戳。在你的情况下,可能看起来像这样:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);

WatermarkStrategy<Event> wmStrategy = 
        WatermarkStrategy
          .<Event>forMonotonousTimestamps()
          .withTimestampAssigner((event, timestamp) -> event.getTime());

DataStream<Event> stream = env.addSource(
        kafkaData.assignTimestampsAndWatermarks(wmStrategy));
Run Code Online (Sandbox Code Playgroud)

(注意:这不太有效,因为您的getTime()方法返回一个字符串。您需要解析该字符串并返回一个长整型——通常它是一个表示自纪元以来的毫秒数的长整型。)

涉及 aTimestampAssignerSupplier.Context或 a 的WatermarkGeneratorSupplier.Context情况适用于您需要访问较低级别的 API 来执行更自定义操作的情况。在这种情况下没有必要。