使用 Kafka Streams 在输出中设置时间戳

Sté*_* D. 4 apache-kafka apache-kafka-streams

我在 Kafka 主题“原始数据”中获取 CSV,目标是通过使用正确的时间戳(每行不同)发送另一个主题“数据”中的每一行来转换它们。

目前,我有 2 个主播:

  • 一个在“原始数据”中拆分行,将它们发送到“内部”主题(无时间戳)
  • 一个使用TimestampExtractor“内部”并将它们发送到“数据”。

我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到方法(时间戳提取器仅在消费时使用)。

我在文档中偶然发现了这一行:

请注意,可以通过在调用 #forward() 时显式地为输出记录分配时间戳来更改处理器 API 中的描述默认行为。

但我找不到任何带有时间戳的签名。他们的意思是什么?

你会怎么做?

编辑:明确地说,我有一个 Kafka 主题,其中一条消息包含事件时间和一些值,例如:

2018-01-01,hello 2018-01-02,world (这是一个消息,不是两个)

我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为其事件时间(2018-01-01 和 2018-01-02),而无需中间主题。

Mat*_*Sax 6

为输出设置时间戳需要 Kafka Streams 2.0,并且仅在 Processor API 中受支持。如果您使用 DSL,则可以transform()使用这些 API。

正如您所指出的,您将使用context.forward(). 电话将是:

stream.transform(new TransformerSupplier() {
  public Transformer get() {
    return new Transformer() {
      // omit other methods for brevity
      // you need to get the `context` from `init()`

      public KeyValue transform(K key, V value) {
        // some business logic

        // you can call #forward() as often as you want
        context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));

        return null; // only return data via context#forward()
      }
    }
  }
});
Run Code Online (Sandbox Code Playgroud)