TumblingWindow 中的 Flink AggregateFunction 自动分割为两个窗口以实现大窗口大小

six*_*ain 2 java stream-processing apache-kafka docker apache-flink

我正在使用不同的窗口大小计算一些记录的简单平均值。使用1 小时1 周的窗口没有问题,并且结果计算正确。

var keyed = src
        .filter(event -> event.getSensor_id() < 10000)
        .keyBy(Event::getSensor_id);

var hourResult = keyed
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .aggregate(new AvgQ1(Config.HOUR))
        .setParallelism(5);

var weekResult = keyed
        .window(TumblingEventTimeWindows.of(Time.days(7)))
        .aggregate(new AvgQ1(Config.WEEK))
        .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

相反,使用1 个月(31 天)的窗口,窗口被分成两半,flink 给出两个结果作为输出,一个用于 05-1 到 05-14 的记录,另一个用于从 05-15 到 05-14 的记录。 31.

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(31)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

使用大小为 30 天的窗口,结果会分为 (05-1;05-27) 和 (05-28;05-31)。

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(30)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

这是AggregateFunction.

public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> {
    String windowType;
    public AvgQ1(String windowType) {
        this.windowType = windowType;
    }

    public AccumulatorQ1 createAccumulator() {
        return new AccumulatorQ1();
    }

    @Override
    public AccumulatorQ1 add(Event values, AccumulatorQ1 acc) {
        acc.sum += values.getTemperature();
        acc.sensor_id = values.getSensor_id();
        acc.last_timestamp = values.getTimestamp();
        acc.count++;
        return acc;
    }

    @Override
    public AccumulatorQ1 merge(AccumulatorQ1 a, AccumulatorQ1 b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    @Override
    public OutQ1 getResult(AccumulatorQ1 acc) {
        double mean = acc.sum / (double) acc.count;
        OutQ1 result = new OutQ1(windowType);
        result.setSensor_id(acc.sensor_id);
        result.setTemperature(mean);
        result.setOccurrences(acc.count);
        if (windowType.equals(Config.HOUR)) {
            result.setTimestamp(Tools.getHourSlot(acc.last_timestamp));
        }
        if (windowType.equals(Config.WEEK)) {
            result.setTimestamp(Tools.getWeekSlot(acc.last_timestamp));
        }
        if (windowType.equals(Config.MONTH)) {
            result.setTimestamp(Tools.getMonthSlot(acc.last_timestamp));
        }
        return result;
    }
}
Run Code Online (Sandbox Code Playgroud)

我认为问题在某种程度上与内存使用有关,就好像累加器或窗口无法容纳太多数据一样。所以我尝试在WebUI中监控jvm堆使用情况,但没有超出限制,并且还将后端状态从hash更改为rockdb。

我在 docker 上使用 Flink,从 kafka 主题读取 DataStream,有什么想法吗?

Dav*_*son 5

该问题与 Flink 基于时间的窗口分配器的工作方式有关。它将自 Unix 纪元 (01-01-1970) 以来的时间划分为指定持续时间的大小相等的块(窗口),然后将传入事件分配到这些块(窗口)中。

因此,对于 30 天长的窗口,这些窗口涵盖以下范围:

01-01-1970 thru 30-01-1970
31-01-1970 thru 01-02-1970
...
29-04-2022 thru 28-05-2022
29-05-2022 thru 27-06-2022
...
Run Code Online (Sandbox Code Playgroud)

这对于一秒、一分钟、一小时、一天甚至一周的窗口来说还可以,但对于一个月的窗口来说就不太方便了。