sun*_*azz 5 java kafka-consumer-api kafka-producer-api apache-kafka-streams
我正在使用FluentD(第12版稳定版)向Kafka发送消息.但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1.因此,当消息到达kafka时,我必须使用WallclockTimestampExtractor将记录的时间戳设置为时间点.
我真正感兴趣的时间戳是由流利的信息发送的:
"时间戳": "1507885936", "主人": "VXYZ"
卡夫卡的记录表示:
offset = 0,timestamp = - 1,key = null,value = {"timestamp":"1507885936","host":"VXYZ"}
我想在卡夫卡有这样的记录:
offset = 0,timestamp = 1507885936,key = null,value = {"timestamp":"1507885936","host":"VXYZ"}
我的解决方法如下: - 编写一个消费者来提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
我更喜欢KafkaStreams解决方案,如果有的话.
您可以编写一个非常简单的Kafka Streams应用程序,例如:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
Run Code Online (Sandbox Code Playgroud)
并使用TimestampExtractor从记录中提取时间戳的自定义配置应用程序并将其返回.
在将记录写回Kafka时,Kafka Streams将使用返回的时间戳.
注意:如果您有乱序数据 - 即时间戳没有严格排序 - 结果也将包含乱序时间戳.Kafka Streams使用返回的时间戳写回Kafka(即,无论提取器返回什么,都用作记录元数据时间戳).请注意,在写入时,当前处理的输入记录的时间戳用于所有生成的输出记录 - 这适用于版本1.0,但在将来的版本中可能会更改.).
| 归档时间: |
|
| 查看次数: |
1359 次 |
| 最近记录: |