如何真正丢弃迟到的记录?

Har*_*mer 2 apache-kafka apache-kafka-streams

这是问题所在:

假设有一个数字流,我想从这些数字中收集 1 小时时段的 MAX,其中我允许给定时段最多 3 小时的延迟。

这听起来像是翻滚窗户的实验室案例。

这是我到目前为止所拥有的:

stream.aggregate(
      () -> 0L,
      (aggKey, value, aggregate) -> Math.max(value, aggregate),
      TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)),
      Serdes.Long(),
      "my_store"
)
Run Code Online (Sandbox Code Playgroud)

首先,我无法验证这是否确实发生在测试中。时间戳是通过 TimestampExtractor 提取的,我模拟延迟Thread.sleep(我将窗口设置为较小的测试值),但“延迟记录”仍然被处理而不是被丢弃。

常规窗口上似乎很少(没有?)示例。有一个关于 SessionWindows 的集成测试,但就是这样。我是否正确理解了这些概念?

编辑 2

示例 JUnit 测试。由于它相当大,我通过 Gist 分享它。

https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6

编辑(添加更多代码)

数据点具有时间戳(收集数据的时间)、收集数据的机器的主机名和值。

{
    "collectedAt": 12314124134, // timestamp
    "hostname": "machine-1",
    "reading": 3
}
Run Code Online (Sandbox Code Playgroud)

自定义时间戳提取器用于获取collectedAt. 这是我的管道的更完整表示:

{
    "collectedAt": 12314124134, // timestamp
    "hostname": "machine-1",
    "reading": 3
}
Run Code Online (Sandbox Code Playgroud)

测试的一个片段是

source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>"
  .groupByKey(Serdes.String(), roundDataSerde)
  .aggregate(
          () -> RoundData.EMPTY_ROUND,
          (aggKey, value, aggregate) -> max(value, aggregate),
          TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
                     .until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
          roundDataSerde,
          "entries_store"
  )
  .toStream()
  .map(this::simpleRoundDataToAggregate) // Associates record with a key like "<timestamp floored to nearest hour>"
  .groupByKey(aggregateSerde, aggregateSerde)
  .aggregate(
          () -> MyAggregate.EMPTY,
          (aggKey, value, aggregate) -> aggregate.merge(value), // I know this is not idempotent, that's a WIP
          TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
                     .until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
          aggregateSerde,
          "result_store"
  )
  .print()
Run Code Online (Sandbox Code Playgroud)

任何帮助将非常感激。

Mic*_*oll 5

认为Hartimer 的自我回答实际上是不正确的。让我试着解释发生了什么,至少就我自己所知。:-)

  • 迟到的数据是根据通过配置的时间戳提取器为您的应用程序配置的时间语义处理的。在@Hartimer 的情况下,这是事件时间(此处使用自定义时间戳提取器)。
  • FWIW,在处理时间的情况下,根据定义,没有迟到的记录:每个记录都“及时”到达。“迟到”记录(同样,在此上下文中没有这样的记录)包含在当前窗口中,但永远不会回装到较早的窗口中。
  • TimeWindows#until()设置窗口保留时间的调用是保留时间的下限。Kafka 可能会比配置的保留时间保留一个窗口“更长一点”(我在这里故意模糊,见下文)。出于这个原因,像@Hartimer 这样的严格测试可能不会产生直觉上预期的结果。

幕后实际发生的关于窗口保留时间是下限的事情有点棘手(可能超出了这个问题的范围),所以我推迟试图解释这一点,除非有具体要求我这样做.

更新:另外,问题片段中的这段代码甚至不应该工作,因为它应该抛出IllegalArgumentException

TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
           .until(TimeUnit.SECONDS.toMillis(1L))
Run Code Online (Sandbox Code Playgroud)

要求是,对于它们各自的输入参数,until() >= of(). 您不能定义大小为 1 小时但保留期仅为 1 秒的窗口(此处保留必须 >= 1 小时)。

更新 2:幕后发生的事情是 的设置TimeWindows#until()用于创建/管理本地窗口存储的段文件。只要窗口段存在,就会接受该窗口的迟到记录。我将跳过有关如何删除/过期段的部分,因为我真的需要深入研究代码(我不知道)。