six*_*ain 2 java stream-processing apache-kafka docker apache-flink
我正在使用不同的窗口大小计算一些记录的简单平均值。使用1 小时和1 周的窗口没有问题,并且结果计算正确。
var keyed = src
.filter(event -> event.getSensor_id() < 10000)
.keyBy(Event::getSensor_id);
var hourResult = keyed
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new AvgQ1(Config.HOUR))
.setParallelism(5);
var weekResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(7)))
.aggregate(new AvgQ1(Config.WEEK))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
相反,使用1 个月(31 天)的窗口,窗口被分成两半,flink 给出两个结果作为输出,一个用于 05-1 到 05-14 的记录,另一个用于从 05-15 到 05-14 的记录。 31.
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(31)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
使用大小为 30 天的窗口,结果会分为 (05-1;05-27) 和 (05-28;05-31)。
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(30)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
这是AggregateFunction.
public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> {
String windowType;
public AvgQ1(String windowType) {
this.windowType = windowType;
}
public AccumulatorQ1 createAccumulator() {
return new AccumulatorQ1();
}
@Override
public AccumulatorQ1 add(Event values, AccumulatorQ1 acc) {
acc.sum += values.getTemperature();
acc.sensor_id = values.getSensor_id();
acc.last_timestamp = values.getTimestamp();
acc.count++;
return acc;
}
@Override
public AccumulatorQ1 merge(AccumulatorQ1 a, AccumulatorQ1 b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
@Override
public OutQ1 getResult(AccumulatorQ1 acc) {
double mean = acc.sum / (double) acc.count;
OutQ1 result = new OutQ1(windowType);
result.setSensor_id(acc.sensor_id);
result.setTemperature(mean);
result.setOccurrences(acc.count);
if (windowType.equals(Config.HOUR)) {
result.setTimestamp(Tools.getHourSlot(acc.last_timestamp));
}
if (windowType.equals(Config.WEEK)) {
result.setTimestamp(Tools.getWeekSlot(acc.last_timestamp));
}
if (windowType.equals(Config.MONTH)) {
result.setTimestamp(Tools.getMonthSlot(acc.last_timestamp));
}
return result;
}
}
Run Code Online (Sandbox Code Playgroud)
我认为问题在某种程度上与内存使用有关,就好像累加器或窗口无法容纳太多数据一样。所以我尝试在WebUI中监控jvm堆使用情况,但没有超出限制,并且还将后端状态从hash更改为rockdb。
我在 docker 上使用 Flink,从 kafka 主题读取 DataStream,有什么想法吗?
该问题与 Flink 基于时间的窗口分配器的工作方式有关。它将自 Unix 纪元 (01-01-1970) 以来的时间划分为指定持续时间的大小相等的块(窗口),然后将传入事件分配到这些块(窗口)中。
因此,对于 30 天长的窗口,这些窗口涵盖以下范围:
01-01-1970 thru 30-01-1970
31-01-1970 thru 01-02-1970
...
29-04-2022 thru 28-05-2022
29-05-2022 thru 27-06-2022
...
Run Code Online (Sandbox Code Playgroud)
这对于一秒、一分钟、一小时、一天甚至一周的窗口来说还可以,但对于一个月的窗口来说就不太方便了。
| 归档时间: |
|
| 查看次数: |
618 次 |
| 最近记录: |