小编sun*_*azz的帖子

Kafka Streams:如何更改记录时间戳(0.11.0)?

我正在使用FluentD(第12版稳定版)向Kafka发送消息.但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1.因此,当消息到达kafka时,我必须使用WallclockTimestampExtractor将记录的时间戳设置为时间点.

我真正感兴趣的时间戳是由流利的信息发送的:

"时间戳": "1507885936", "主人": "VXYZ"

卡夫卡的记录表示:

offset = 0,timestamp = - 1,key = null,value = {"timestamp":"1507885936","host":"VXYZ"}

我想在卡夫卡有这样的记录:

offset = 0,timestamp = 1507885936,key = null,value = {"timestamp":"1507885936","host":"VXYZ"}

我的解决方法如下: - 编写一个消费者来提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)

  • 编写一个生产者来生成一个设置了时间戳的新记录(ProducerRecord(String topic,Integer partition,Long timestamp,K key,V value)

我更喜欢KafkaStreams解决方案,如果有的话.

java kafka-consumer-api kafka-producer-api apache-kafka-streams

5
推荐指数
1
解决办法
1359
查看次数