Gli*_*ide 13 apache-kafka confluent
我KafkaProducer在我的测试用例中使用,我的生产者使用schemaRegistryUrl指向我本地实例的Schema Registry.有没有办法模拟KafkaProducer与Schema Registry的连接方式?也就是说,让KafkaProducer/Consumer我的测试在没有运行Schema Registry的实例的情况下工作.
Gar*_*Pan 13
在 5.3.x 中,您可以找到 MOCK_URL_PREFIX = "mock://",因此只需使用前缀“mock://”设置测试 schemaRegistryUrl,例如:“mock://testurl”。
绝对有。KafkaAvroSerializer和KafkaAvroDeserializer都有一个接受SchemaRegistryClient的构造函数。您可以将MockSchemaRegistryClient用作SchemaRegistryClient。这是显示如何执行此操作的代码段:
private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";
public <T> Serde<T> getAvroSerde(boolean isKey) {
return Serdes.serdeFrom(getSerializer(isKey), getDeserializer(isKey));
}
private <T> Serializer<T> getSerializer(boolean isKey) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
serializer.configure(map, isKey);
return serializer;
}
private <T> Deserializer<T> getDeserializer(boolean key) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
deserializer.configure(map, key);
return deserializer;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3418 次 |
| 最近记录: |