小编Val*_*onn的帖子

如何在不复制拓扑的情况下测试 Kafka Stream 应用程序?使用TopologyTestDriver?

我想我误解了 TopologyTestDriver 的某些内容。

到目前为止,当我开发 Kafka Stream 应用程序时,我会在测试环境中连接到真实的 Kafka,手动创建并提供内主题,并检查外主题中的内容。

为了自动化测试,我查看了TopologyTestDriver. 根据Confluence发布的文档(https://docs.confluence.io/current/streams/developer-guide/test-streams.html),这似乎就是我正在寻找的:

[...]通常,您使用 KafkaStreams 类运行拓扑,该类连接到您的代理[...]。对于测试来说,运行代理[...]会增加很多复杂性和时间。

Streams 提供了 KafkaStreams 类的 TopologyTestDriver 替代品。它没有外部系统依赖性[...]有用于验证发送到输出主题的数据的挂钩[...]。

我关心的是拓扑的位置。在我看到的每个示例中(在 Confluence 上、在 Kafka 网站上……),拓扑都是在测试文件中定义的:

private TopologyTestDriver testDriver;
....

@Before
public void setup() {
    Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    .......

    // setup test driver
    Properties config = new Properties();
    ....
    testDriver = new TopologyTestDriver(topology, config);

    // pre-populate store
    .....
}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void shouldFlushStoreForFirstInput() {
    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

5
推荐指数
0
解决办法
861
查看次数

使用kafka流获取时间窗口中给定键的最后一个事件

我开始使用 KStream 来使用来自现有主题的数据。

我只对在 10 秒窗口内获取给定 ID 的最后一个事件感兴趣。我尝试使用以下代码:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
Run Code Online (Sandbox Code Playgroud)

但我最终得到了所有事件,而不仅仅是最后一个。使用 KStream 可以做我想做的事吗?

java dsl stream apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1283
查看次数

标签 统计

apache-kafka ×2

apache-kafka-streams ×2

java ×2

dsl ×1

stream ×1