mar*_*hon 5 apache-kafka apache-kafka-streams
我使用此拓扑的目的是对传入消息进行窗口化,然后对它们进行计数,然后将计数发送到另一个主题。
当我使用单个键和输入主题的一个或多个值对此进行测试时,我得到不一致的结果。有时计数是正确的。有时我会发送一条消息,在第一条消息中查看该条消息peek,并且在第二条消息peek和输出主题中得到一些其他值,而不是计数为 1。当我发送多条消息时,计数通常是正确的,但有时会错误。我很小心地在时间窗口内发送消息,所以我认为它们不会被分成两个窗口。
我的拓扑结构有缺陷吗?
public static final String INPUT_TOPIC = "test-topic";
public static final String OUTPUT_TOPIC = "test-output-topic";
public static void buildTopo(StreamsBuilder builder) {
WindowBytesStoreSupplier store = Stores.persistentTimestampedWindowStore(
"my-state-store",
Duration.ofDays(1),
Duration.ofMinutes(1),
false);
Materialized<String, Long, WindowStore<Bytes, byte[]>> materialized = Materialized
.<String, Long>as(store)
.withKeySerde(Serdes.String());
Suppressed<Windowed> suppression = Suppressed
.untilWindowCloses(Suppressed.BufferConfig.unbounded());
TimeWindows window = TimeWindows
.of(Duration.ofMinutes(1))
.grace(Duration.ofSeconds(0));
// windowedKey has a string plus the kafka time window
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("****key = " + key + " value= " + value))
.groupByKey()
.windowedBy(window)
.count(materialized)
.suppress(suppression)
.toStream()
.peek((key, value) -> System.out.println("key = " + key + " value= " + value))
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
156 次 |
| 最近记录: |