avi*_*103 5 java protocol-buffers apache-kafka-streams confluent-schema-registry
我正在使用 Kafka 流来读取和处理 protobuf 消息。
我正在为流使用以下属性:
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
properties.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.getClientId());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaConfig.getApplicationId());
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfig.getSchemaRegistryUrl());
properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, ProtobufData.class);
return properties;
}
Run Code Online (Sandbox Code Playgroud)
但在运行时我遇到了这个错误:
Caused by: java.lang.ClassCastException: class com.google.protobuf.DynamicMessage cannot be cast to class model.schema.proto.input.ProtobufDataProto$ProtobufData (com.google.protobuf.DynamicMessage and model.schema.proto.input.ProtobufDataProto$ProtobufData are in unnamed module of loader 'app')
我的.proto文件如下所示:
import "inner_data.proto";
package myPackage;
option java_package = "model.schema.proto.input";
option java_outer_classname = "ProtobufDataProto";
message OuterData {
string timestamp = 1;
string x = 3;
repeated InnerObject flows = 4;
}
Run Code Online (Sandbox Code Playgroud)
(我有两个单独的原型文件)
package myPackage;
option java_package = "model.schema.proto.input";
option java_outer_classname = "InnerDataProto";
message InnerData {
string a = 1;
string b = 2;
string c = 3;
}
Run Code Online (Sandbox Code Playgroud)
我想知道为什么 Kafka 使用,DynamicMessage即使我在属性中给出了特定的 protobuf 值类,以及如何解决这个问题?
我在尝试让 Kafkastream 与 protobuf 一起工作时遇到了同样的问题,
我通过专门KafkaProtobufSerde配置streambuilder并通过使用以下行显式指定要反序列化的类来解决这个问题:serdeConfig.put(SPECIFIC_PROTOBUF_VALUE_TYPE,ProtobufDataProto.class.getName());
/*
* Define SpecificSerde for Even in protobuff
*/
final KafkaProtobufSerde< ProtobufDataProto > protoSerde = new KafkaProtobufSerde<>();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
/*
* Technically, the following line is only mandatory in order to de-serialize object into GeneratedMessageV3
* and NOT into DynamicMessages : https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/DynamicMessage
*/
serdeConfig.put(SPECIFIC_PROTOBUF_VALUE_TYPE,ProtobufDataProto.class.getName());
protoSerde.configure(serdeConfig, false);
Run Code Online (Sandbox Code Playgroud)
然后我可以创建我的输入流并将其反序列化:
//Define a Serde for the key
final Serde<byte[]> bytesSerde = Serdes.ByteArray();
//Define the stream
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("inputTopic", Consumed.with(bytesSerde, protoSerde));
/*
add your treatments, maps, filter etc
...
*/
streamsBuilder.build();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1518 次 |
| 最近记录: |