Kafka Streams:用于聚合的自定义TimestampExtractor

hau*_*keh 5 apache-kafka apache-kafka-streams

我正在构建一个非常简单的KafkaStreams演示应用程序,以测试一个用例。

我无法升级我正在使用的Kafka代理(当前版本为0.10.0),并且有0.10.0之前的生产者编写的一些消息,因此我使用的是自定义TimestampExtractor,我将其添加为我主类开始时的默认配置:

config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);
Run Code Online (Sandbox Code Playgroud)

从我的源主题中消费时,此方法可以很好地工作。但是,当使用聚合运算符时,由于在使用内部聚合主题时使用的FailOnInvalidTimestamp实现TimestampExtractor而不是自定义实现,因此遇到了一个异常。

Streams应用程序的代码如下所示:

...

KStream<String, MyValueClass> clickStream = streamsBuilder
              .stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));

KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
              .map(((key, value) -> new KeyValue<>(value.getId(), value)))
              .groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
              .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
              .count();
...
Run Code Online (Sandbox Code Playgroud)

我遇到的异常如下:

    Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp. 
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
Run Code Online (Sandbox Code Playgroud)

现在的问题是:TimestampExtractor从内部聚合主题中读取时,我有什么方法可以使Kafka Streams使用自定义项(最好是在仍然使用Streams DSL的情况下)?

Mat*_*Sax 5

你不能改变的时间戳提取器(作为v1.0.0)。出于正确性原因,不允许这样做。

但是我真的很想知道,首先如何将时间戳为-1的记录写入该主题。Kafka Streams在写入记录时会使用您的自定义提取器提供的时间戳。还要注意,KafkaProducer不允许写负时间戳记录。

因此,我能想到的唯一解释是,其他生产者确实在重新分区主题中写过-这是不允许的...只有Kafka Streams应该在重新分区主题中写。

我想,您将需要删除此主题,并让Kafka Streams重新创建它,以使其恢复到干净的状态。

从其他答案的讨论/评论中:

您需要使用0.10+格式才能使用Kafka Streams。如果您升级经纪人并使用0.9或更旧的格式,则Kafka Streams可能无法正常工作。