如何使用 Avro (schemaRegistry) 对 Kafka Streams 进行功能测试?

Ram*_*mez 5 java avro apache-kafka apache-kafka-streams

  • 简要说明我想要实现的目标:我想对 avro 记录的 kafka 流拓扑(使用 TopologyTestDriver)进行功能测试。

  • 问题:无法“模拟”schemaRegistry 以自动化模式发布/读取

到目前为止我尝试的是使用 MockSchemaRegistryClient 尝试模拟 schemaRegistry,但我不知道如何将其链接到 Avro Serde。

代码

public class SyncronizerIntegrationTest {


    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


    @Test
    void integrationTest() throws IOException, RestClientException {


        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }
}
Run Code Online (Sandbox Code Playgroud)

AvroSerde方法

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}
Run Code Online (Sandbox Code Playgroud)

如果我运行测试但testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));效果很好(看起来一切都已正确解决)

当我尝试插入数据(pipelineInput)时,它抛出以下异常:对象“Tracking”已满。

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)
Run Code Online (Sandbox Code Playgroud)

编辑后,我没有删除它,因为“历史日志”提供了遵循的路径。

cri*_*007 3

Confluence 提供了大量的示例代码,用于测试 Kafka(流)以及 Schema 注册表。

https://github.com/confluenceinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluence/examples/streams/SpecificAvroIntegrationTest.java

最重要的是,模拟并不是完整的集成测试 - 使用内存模式注册表启动实际的 Kafka 代理才是完整的集成测试。

在上面的代码中,请参阅

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
Run Code Online (Sandbox Code Playgroud)

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
Run Code Online (Sandbox Code Playgroud)