Rod*_*djf 10 java classcastexception avro apache-kafka confluent-schema-registry
当尝试将 record.value() 转换为 java 对象时,我在消费者中遇到了这个异常:
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
Run Code Online (Sandbox Code Playgroud)
生产者发送 java 对象,它是一个名为 的用户定义类型,如下所示:PublicActivityRecord
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
Run Code Online (Sandbox Code Playgroud)
此时我可以在调试模式下看到 的值ProducerRecord确实是类型PublicActivityRecord。
在注册表服务器上,我可以在日志中看到发送模式的生产者的 POST 请求:
Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)
Run Code Online (Sandbox Code Playgroud)
在消费者方面:
KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());
[...]
this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
this.producer.flush();
Run Code Online (Sandbox Code Playgroud)
这里发生了ClassCastException。
在调试模式下,我可以看到record.value确实是类型GenericData$Record。并且它不能转换为 PublicActivityRecord。
序列化器/反序列化器的键和值是相同的:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)
在 schema-registry 日志中,我可以看到消费者的 GET 请求:
"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)
Run Code Online (Sandbox Code Playgroud)
所以我检查过:
PublicActivityRecordGenericData$Record这导致我得出的结果是我的消费者出了问题。
那么问题来了:为什么消费者得到的是记录GenericData而不是预期的PublicActivityRecord呢?
任何线索将不胜感激!
cri*_*007 22
默认情况下,仅返回通用记录。你需要设置
value.deserializer.specific.avro.reader=true
Run Code Online (Sandbox Code Playgroud)
或者,在您的消费者配置中使用常量
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG= 真
| 归档时间: |
|
| 查看次数: |
10575 次 |
| 最近记录: |