Ram*_*mez 5 java avro apache-kafka apache-kafka-streams
简要说明我想要实现的目标:我想对 avro 记录的 kafka 流拓扑(使用 TopologyTestDriver)进行功能测试。
问题:无法“模拟”schemaRegistry 以自动化模式发布/读取
到目前为止我尝试的是使用 MockSchemaRegistryClient 尝试模拟 schemaRegistry,但我不知道如何将其链接到 Avro Serde。
代码
public class SyncronizerIntegrationTest {
private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
@Test
void integrationTest() throws IOException, RestClientException {
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
StreamsBuilder kStreamBuilder = new StreamsBuilder();
Serde<Tracking> avroSerde = getAvroSerde();
mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());
KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
"topic",
Consumed.with(Serdes.String(), avroSerde));
unmappedOrdersStream
.filter((k, v) -> v != null).to("ouput");
Topology topology = kStreamBuilder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
}
}
Run Code Online (Sandbox Code Playgroud)
AvroSerde方法
private <T extends SpecificRecord> Serde<T> getAvroSerde() {
// Configure Avro ser/des
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");
final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(avroSerdeConfig, false); // `false` for record values
return avroSerde;
}
Run Code Online (Sandbox Code Playgroud)
如果我运行测试但testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));效果很好(看起来一切都已正确解决)
但
当我尝试插入数据(pipelineInput)时,它抛出以下异常:对象“Tracking”已满。
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)
Run Code Online (Sandbox Code Playgroud)
编辑后,我没有删除它,因为“历史日志”提供了遵循的路径。
Confluence 提供了大量的示例代码,用于测试 Kafka(流)以及 Schema 注册表。
最重要的是,模拟并不是完整的集成测试 - 使用内存模式注册表启动实际的 Kafka 代理才是完整的集成测试。
在上面的代码中,请参阅
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
Run Code Online (Sandbox Code Playgroud)
和
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5960 次 |
| 最近记录: |