小编avi*_*103的帖子

kafka 流 protobuf 转换异常

我正在使用 Kafka 流来读取和处理 protobuf 消息。

我正在为流使用以下属性:


        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
        properties.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.getClientId());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaConfig.getApplicationId());
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());

        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
        properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfig.getSchemaRegistryUrl());
        properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, ProtobufData.class);
        return properties;
    }
Run Code Online (Sandbox Code Playgroud)

但在运行时我遇到了这个错误:

Caused by: java.lang.ClassCastException: class com.google.protobuf.DynamicMessage cannot be cast to class model.schema.proto.input.ProtobufDataProto$ProtobufData (com.google.protobuf.DynamicMessage and model.schema.proto.input.ProtobufDataProto$ProtobufData are in unnamed module of loader 'app')

我的.proto文件如下所示:

import "inner_data.proto";
package myPackage;

option java_package = "model.schema.proto.input";
option java_outer_classname = "ProtobufDataProto";

message OuterData {
    string timestamp = 1;
    string x = 3;
    repeated InnerObject …
Run Code Online (Sandbox Code Playgroud)

java protocol-buffers apache-kafka-streams confluent-schema-registry

5
推荐指数
1
解决办法
1518
查看次数