Val*_*onn 5 java apache-kafka apache-kafka-streams
我想我误解了 TopologyTestDriver 的某些内容。
到目前为止,当我开发 Kafka Stream 应用程序时,我会在测试环境中连接到真实的 Kafka,手动创建并提供内主题,并检查外主题中的内容。
为了自动化测试,我查看了TopologyTestDriver. 根据Confluence发布的文档(https://docs.confluence.io/current/streams/developer-guide/test-streams.html),这似乎就是我正在寻找的:
[...]通常,您使用 KafkaStreams 类运行拓扑,该类连接到您的代理[...]。对于测试来说,运行代理[...]会增加很多复杂性和时间。
Streams 提供了 KafkaStreams 类的 TopologyTestDriver 替代品。它没有外部系统依赖性[...]有用于验证发送到输出主题的数据的挂钩[...]。
我关心的是拓扑的位置。在我看到的每个示例中(在 Confluence 上、在 Kafka 网站上……),拓扑都是在测试文件中定义的:
private TopologyTestDriver testDriver;
....
@Before
public void setup() {
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
.......
// setup test driver
Properties config = new Properties();
....
testDriver = new TopologyTestDriver(topology, config);
// pre-populate store
.....
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void shouldFlushStoreForFirstInput() {
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
OutputVerifier.compareKeyValue(......);
Assert.assertNull(........);
}
Run Code Online (Sandbox Code Playgroud)
如何对现有的 Kafka Streams 应用程序实施测试?我无法想象我必须复制拓扑的代码 - 主文件中的一个版本和测试文件中的一个版本。维护起来会很困难。
我现有的应用程序直接在方法中定义了拓扑main。我可以通过添加一个静态方法(如 )从主程序中提取拓扑设计protected static org.apache.kafka.streams.Topology buildTopology() {...},并且我可以从测试文件中调用此方法。
但是没有更干净的解决方案来从测试代码执行主代码吗?