测试Kafka Streams拓扑

ime*_*ehl 8 testing apache-kafka apache-kafka-streams

我正在寻找一种测试Kafka Streams应用程序的方法.这样我就可以定义输入事件,测试套件会显示输出.

没有真正的Kafka设置,这可能吗?

Dmi*_*sky 16

更新 Kafka 1.1.0(2018年3月23日发布):

KIP-247增加了官方测试工具.根据升级指南:

有一个新的神器kafka-streams-test-utils提供TopologyTestDriver,ConsumerRecordFactoryOutputVerifier类.您可以将新工件包含为单元测试的常规依赖项,并使用测试驱动程序测试Kafka Streams应用程序的业务逻辑.有关更多详细信息,请参阅KIP-247.

文档:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>1.1.0</version>
    <scope>test</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)

测试驱动程序模拟库运行时,该库运行时连续从输入主题中提取记录并通过遍历拓扑来处理它们.您可以使用测试驱动程序验证指定的处理器拓扑是否使用手动管道的数据记录计算正确的结果.测试驱动程序捕获结果记录并允许查询其嵌入的状态存储:

// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));

// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
Run Code Online (Sandbox Code Playgroud)

有关详细信息,请参阅文档


ProcessorTopologyTestDriver从0.11.0.0开始提供.它在kafka-streams测试工件中可用(<classifier>test</classifier>在Maven中指定):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.11.0.0</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)

您还需要添加kafka-clients测试工件:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)

然后你可以使用测试驱动程序.根据Javadoc,首先创建一个ProcessorTopologyTestDriver:

StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
Run Code Online (Sandbox Code Playgroud)

您可以将输入提供给拓扑,就像您实际写入其中一个输入主题一样:

driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
Run Code Online (Sandbox Code Playgroud)

并阅读输出主题:

ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
Run Code Online (Sandbox Code Playgroud)

然后你可以断言这些结果.