gno*_*nos 6 java unit-testing windowing apache-kafka-streams
我正在使用Kafka Stream 2.1
我正在尝试为流应用程序编写一些测试,该应用程序使用会话间隔为300ms的会话窗口按事件的键(即相关ID)聚合某些事件。
这是由方法表示的聚合实现:
private static final int INACTIVITY_GAP = 300;
public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {
return source
// group by key (i.e by correlation ID)
.groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
// Define a session window with an inactivity gap of 300 ms
.windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
.aggregate(
// initializer
() -> new AggregatedCustomObject(),
// aggregates records in same session
(s, customObject, aggCustomObject) -> {
// ...
return aggCustomObject;
},
// merge sessions
(s, aggCustomObject1, aggCustomObject2) -> {
// ...
return aggCustomObject2;
},
Materialized.with(Serdes.String(), new AggCustomObjectSerde())
)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.selectKey((stringWindowed, aggCustomObject) -> "someKey");
;
}
Run Code Online (Sandbox Code Playgroud)
此流过程按预期方式工作。但是对于单元测试,则完全不同。
我的测试流配置如下所示:
// ...
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
// disable cache
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// commit ASAP
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);
Topology topology = builder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)
// ...
Run Code Online (Sandbox Code Playgroud)
一个测试如下所示:
List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
.map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
.collect(Collectors.toList());
testDriver.pipeInput(records);
ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);
Run Code Online (Sandbox Code Playgroud)
问题是,record始终为空。我尝试了很多事情:
advanceWallClockTime测试驱动程序的方法好吧,没有任何帮助。有人可以告诉我我缺少什么,如何测试基于会话窗口的流应用程序?
非常感谢
SessionWindows使用event-time而不是wall-clock。尝试正确设置记录的事件时间以模拟不活动间隙。就像是:
testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));
Run Code Online (Sandbox Code Playgroud)
但首先,您需要一个自定义,TimestampExtractor例如:
public static class RecordTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
return record.timestamp();
}
}
Run Code Online (Sandbox Code Playgroud)
必须像这样注册:
streamProperties.setProperty(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
RecordTimestampExtractor.class.getName()
);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
167 次 |
| 最近记录: |