kafka 流中 TimeWindowKStream 的间歇性错误计数

mar*_*hon 5 apache-kafka apache-kafka-streams

我使用此拓扑的目的是对传入消息进行窗口化,然后对它们进行计数,然后将计数发送到另一个主题。

当我使用单个键和输入主题的一个或多个值对此进行测试时,我得到不一致的结果。有时计数是正确的。有时我会发送一条消息,在第一条消息中查看该条消息peek,并且在第二条消息peek和输出主题中得到一些其他值,而不是计数为 1。当我发送多条消息时,计数通常是正确的,但有时会错误。我很小心地在时间窗口内发送消息,所以我认为它们不会被分成两个窗口。

我的拓扑结构有缺陷吗?

    public static final String INPUT_TOPIC  = "test-topic";
    public static final String OUTPUT_TOPIC = "test-output-topic";

    public static void buildTopo(StreamsBuilder builder) {
        WindowBytesStoreSupplier store = Stores.persistentTimestampedWindowStore(
                "my-state-store",
                Duration.ofDays(1),
                Duration.ofMinutes(1),
                false);

        Materialized<String, Long, WindowStore<Bytes, byte[]>> materialized = Materialized
                .<String, Long>as(store)
                .withKeySerde(Serdes.String());

        Suppressed<Windowed> suppression = Suppressed
                .untilWindowCloses(Suppressed.BufferConfig.unbounded());

        TimeWindows window = TimeWindows
                .of(Duration.ofMinutes(1))
                .grace(Duration.ofSeconds(0));

        // windowedKey has a string plus the kafka time window
        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .peek((key, value) -> System.out.println("****key = " + key + " value= " + value))
                .groupByKey()
                .windowedBy(window)
                .count(materialized)
                .suppress(suppression)
                .toStream()
                .peek((key, value) -> System.out.println("key = " + key + " value= " + value))
                .map((key, value) -> new KeyValue<>(key.key(), value))
                .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    }
Run Code Online (Sandbox Code Playgroud)