ngi*_*nel 5 apache-flink flink-streaming
我有一个包含事件的流Event(Id, Type, Date),我想处理按 (Id, Type) 和活动会话分组的那些事件
例如,从事件
Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...
Run Code Online (Sandbox Code Playgroud)
我希望有以下窗口:
Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...
Run Code Online (Sandbox Code Playgroud)
根据我的理解,我应该能够使用键控流上的事件时间会话窗口来做到这一点。但是使用我的代码,只打印包含第一个事件 (Event1) 的第一个窗口 (Window1)。
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment
.addSource(kafkaConsumer.setStartFromEarliest())
.assignTimestampsAndWatermarks(<timestamp assigner>)
.keyBy(e => (e.getId, e.getType))
.window(EventTimeSessionWindows.withGap(Time.days(1)))
.apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
})
.print()
Run Code Online (Sandbox Code Playgroud)
这是处理历史事件和会话窗口的适当方式吗?
您的情况的问题是水印始终是根据传入事件生成的。如果没有传入事件,则水印不会进行。在您的示例中,仅发出 Window1,因为仅对于 Event1,还有另一个带有时间戳的后续事件,该事件将 Watermark 推进到会话间隙之外。对于其余三个事件,没有这样的元素。对于 event3 和 event4 根本没有这样的事件。另外,因为流是有键的,所以具有不同键的元素是独立处理的。在这种情况下,水印不会前进,因此不会发出窗口。
| 归档时间: |
|
| 查看次数: |
1042 次 |
| 最近记录: |