如何让KafkaProducer使用模拟Schema Registry进行测试?

Gli*_*ide 13 apache-kafka confluent

KafkaProducer在我的测试用例中使用,我的生产者使用schemaRegistryUrl指向我本地实例的Schema Registry.有没有办法模拟KafkaProducer与Schema Registry的连接方式?也就是说,让KafkaProducer/Consumer我的测试在没有运行Schema Registry的实例的情况下工作.

Gar*_*Pan 13

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDe.java

在 5.3.x 中,您可以找到 MOCK_URL_PREFIX = "mock://",因此只需使用前缀“mock://”设置测试 schemaRegistryUrl,例如:“mock://testurl”。

  • github 网址已移动。你能更新一下吗?顺便说一句,只需为我的 schemaRegistryUrl 属性添加 mock://testurl 就很有魅力! (2认同)

csc*_*can 8

绝对有。KafkaAvroSerializer和KafkaAvroDeserializer都有一个接受SchemaRegistryClient的构造函数。您可以将MockSchemaRegistryClient用作SchemaRegistryClient。这是显示如何执行此操作的代码段:

private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";

public <T> Serde<T> getAvroSerde(boolean isKey) {
    return Serdes.serdeFrom(getSerializer(isKey), getDeserializer(isKey));
}

private <T> Serializer<T> getSerializer(boolean isKey) {
    Map<String, Object> map = new HashMap<>();
    map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
    map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
    Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
    serializer.configure(map, isKey);
    return serializer;
}

private <T> Deserializer<T> getDeserializer(boolean key) {
    Map<String, Object> map = new HashMap<>();
    map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
    Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
    deserializer.configure(map, key);
    return deserializer;
}
Run Code Online (Sandbox Code Playgroud)