我正在构建一个非常简单的KafkaStreams演示应用程序,以测试一个用例。
我无法升级我正在使用的Kafka代理(当前版本为0.10.0),并且有0.10.0之前的生产者编写的一些消息,因此我使用的是自定义TimestampExtractor,我将其添加为我主类开始时的默认配置:
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);
Run Code Online (Sandbox Code Playgroud)
从我的源主题中消费时,此方法可以很好地工作。但是,当使用聚合运算符时,由于在使用内部聚合主题时使用的FailOnInvalidTimestamp实现TimestampExtractor而不是自定义实现,因此遇到了一个异常。
Streams应用程序的代码如下所示:
...
KStream<String, MyValueClass> clickStream = streamsBuilder
.stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));
KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
.map(((key, value) -> new KeyValue<>(value.getId(), value)))
.groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.count();
...
Run Code Online (Sandbox Code Playgroud)
我遇到的异常如下:
Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = …Run Code Online (Sandbox Code Playgroud)