eth*_*nny 7 java apache-kafka apache-kafka-streams
我用 org.apache.kafka:kafka-streams:0.10.0.1
我正在尝试使用基于时间序列的流,似乎没有触发KStream.Process()触发("标点符号").(见此处参考)
在KafkaStreams配置中,我传递了这个参数(以及其他):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
Run Code Online (Sandbox Code Playgroud)
这EventTimeExtractor是一个自定义时间戳提取器(实现org.apache.kafka.streams.processor.TimestampExtractor),用于从JSON数据中提取时间戳信息.
我希望这可以TimestampExtractor在每个新记录被拉入时调用我的对象(派生自).所讨论的流是2*10 ^ 6条记录/分钟.我已经punctuate()设定为60秒,它永远不会发射.我知道数据非常频繁地传递了这个范围,因为它将旧的值拉回来.
实际上它永远不会被调用.
2017年11月更新: Kafka 1.0中的Kafka Streams现在支持punctuate()流时间和处理时间(挂钟时间)行为.所以你可以选择你喜欢的任何行为.
您的设置对我来说似乎是对的.
您需要注意的事项:从Kafka 0.10.0开始,该punctuate()方法在流时间上运行(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间).并且流时间仅在新数据记录进入时提前,并且流时间提前多少由这些新记录的相关时间戳确定.
例如:
punctuate()为每1分钟调用= 60 * 1000(注意:流时间为 1分钟).现在,如果在接下来的5分钟内没有收到任何数据,punctuate()则根本不会被调用 - 即使您可能期望它被调用5次.为什么?同样,因为punctuate()依赖于流时间,并且流时间仅基于新接收的数据记录而提前.这可能导致你看到的行为?
展望未来:Kafka项目中已经有一个关于如何punctuate()更灵活的讨论,例如不仅基于stream-time(默认为event-time)而且基于此触发它processing-time.