小编ban*_*ews的帖子

Flink流媒体事件时间窗口排序

我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}
Run Code Online (Sandbox Code Playgroud)

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)
Run Code Online (Sandbox Code Playgroud)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

10
推荐指数
1
解决办法
2377
查看次数

标签 统计

apache-flink ×1

flink-streaming ×1