小编Dav*_*d O的帖子

使用 Kafka Streams 测试窗口聚合

我正在使用 Kafka Streams 的 TopologyTestDriver 来测试我们的数据管道。

它对我们所有的简单拓扑(包括使用 Stores 的有状态拓扑)都非常有效。我的问题是当我尝试使用此测试驱动程序来测试使用窗口聚合的拓扑时。

我复制了一个简单的示例,该示例对 10 秒窗口内使用相同密钥接收到的整数求和。

public class TopologyWindowTests {

TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";

@Before
public void setup(){
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}

/**
 * topology test
 */
@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

6
推荐指数
1
解决办法
2511
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1