Kafka - TimestampExtractor的问题

eth*_*nny 7 java apache-kafka apache-kafka-streams

我用 org.apache.kafka:kafka-streams:0.10.0.1

我正在尝试使用基于时间序列的流,似乎没有触发KStream.Process()触发("标点符号").(见此处参考)

KafkaStreams配置中,我传递了这个参数(以及其他):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());
Run Code Online (Sandbox Code Playgroud)

EventTimeExtractor是一个自定义时间戳提取器(实现org.apache.kafka.streams.processor.TimestampExtractor),用于从JSON数据中提取时间戳信息.

我希望这可以TimestampExtractor在每个新记录被拉入时调用我的对象(派生自).所讨论的流是2*10 ^ 6条记录/分钟.我已经punctuate()设定为60秒,它永远不会发射.我知道数据非常频繁地传递了这个范围,因为它将旧的值拉回来.

实际上它永远不会被调用.

  • 这是在KStream记录上设置时间戳的错误方法吗?
  • 这是否是声明此配置的错误方法?

Mic*_*oll 8

2017年11月更新: Kafka 1.0中的Kafka Streams现在支持punctuate()流时间和处理时间(挂钟时间)行为.所以你可以选择你喜欢的任何行为.

您的设置对我来说似乎是对的.

您需要注意的事项:从Kafka 0.10.0开始,该punctuate()方法在流时间上运行(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间).并且流时间仅在新数据记录进入时提前,并且流时间提前多少由这些新记录的相关时间戳确定.

例如:

  • 假设您已设置punctuate()为每1分钟调用= 60 * 1000(注意:流时间为 1分钟).现在,如果在接下来的5分钟内没有收到任何数据,punctuate()则根本不会被调用 - 即使您可能期望它被调用5次.为什么?同样,因为punctuate()依赖于流时间,并且流时间仅基于新接收的数据记录而提前.

这可能导致你看到的行为?

展望未来:Kafka项目中已经有一个关于如何punctuate()更灵活的讨论,例如不仅基于stream-time(默认为event-time)而且基于此触发它processing-time.


eth*_*nny 1

我认为这是经纪人层面问题的另一个例子。我使用具有更多 CPU 和 RAM 的实例重建了集群。现在我得到了我预期的结果。

远程观察者请注意:如果您的KStream应用程序表现异常,请查看您的代理并确保它们没有陷入 GC,并且有足够的“空间”用于文件句柄、RAM 等。

也可以看看