小编ngi*_*nel的帖子

Flink - 当没有以下事件时发出最后一个窗口

我有一个包含事件的流Event(Id, Type, Date),我想处理按 (Id, Type) 和活动会话分组的那些事件


例如,从事件

Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...
Run Code Online (Sandbox Code Playgroud)

我希望有以下窗口:

Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...
Run Code Online (Sandbox Code Playgroud)

根据我的理解,我应该能够使用键控流上的事件时间会话窗口来做到这一点。但是使用我的代码,只打印包含第一个事件 (Event1) 的第一个窗口 (Window1)。

environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

environment
    .addSource(kafkaConsumer.setStartFromEarliest())
    .assignTimestampsAndWatermarks(<timestamp assigner>)
    .keyBy(e => (e.getId, e.getType))
    .window(EventTimeSessionWindows.withGap(Time.days(1)))
    .apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
          override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
            var count = 0L
            for (in …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

5
推荐指数
1
解决办法
1042
查看次数

标签 统计

apache-flink ×1

flink-streaming ×1