KafkaStreams:获得最终结果

rob*_*011 4 kotlin apache-kafka apache-kafka-streams

是否可以通过抑制中间结果来获得Kafka Streams中的窗口最终结果.

我无法实现这一目标.我的代码出了什么问题?

    val builder = StreamsBuilder()
    builder.stream<String,Double>(inputTopic)
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
            .toStream()
            .print(Printed.toSysOut())
Run Code Online (Sandbox Code Playgroud)

它会导致此错误:

Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
Run Code Online (Sandbox Code Playgroud)

代码/错误详情:https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380

小智 13

问题是Streams在窗口期间自动包装显式serdes的方式存在令人困惑的不对称性,但不会自动换行默认serde.恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806

正如其他人所指出的那样,解决方案是明确设置上游的密钥serde而不依赖于默认密钥serde.你可以:

在窗口聚合上设置serdes Materialized

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count(Materialized.with(Serdes.String(), Serdes.Long()))
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())
Run Code Online (Sandbox Code Playgroud)

(正如Nishu推荐的那样)

(请注意,没有必要命名count操作,这具有使其可查询的副作用)

或者将serdes设置在更上游,例如在输入上:

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count()
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())
Run Code Online (Sandbox Code Playgroud)

(作为wardziniak推荐)

这是你的选择; 我认为在这种情况下,两种情况都没有太大的不同.如果你做的是不同的聚合count,你可能Materialized无论如何都要设置值serde ,所以也许前者可能是更统一的风格.

我还注意到您的窗口定义没有设置宽限期.窗口关闭时间定义为window end + grace period,默认为24小时,因此在24小时的数据通过应用程序运行之前,您不会看到抑制发出的任何内容.

为了您的测试工作,我建议尝试:

.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
Run Code Online (Sandbox Code Playgroud)

在生产中,您需要选择一个宽限期,以平衡您在流中预期的事件延迟量与您希望从抑制中看到的排放快速度.

最后一点,我在你的要点中注意到你没有改变默认的缓存或提交间隔.因此,您会注意到count操作员本身将缓冲更新默认30秒,然后再将其传递给抑制.这是一个很好的生产配置,因此您不会为本地磁盘或Kafka代理创建瓶颈.但是当你进行测试时,它可能会给你带来惊喜.

通常用于测试(或交互式尝试输出),我将禁用缓存并将提交间隔设置为最短,以获得最大的开发人员健全性:

properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
Run Code Online (Sandbox Code Playgroud)

对于serde疏忽感到抱歉.我希望很快就能解决KAFKA-7806问题.

我希望这有帮助!


Nis*_*yal 5

问题出在 KeySerde 上。由于WindowedBy操作导致Windowed<String>键入 key 但.suppress()使用的是默认键类型。

因此,您需要在调用 count 方法时在 State 存储上定义 KeySerde,如下所示:

      builder.stream<String,Double>inputTopic)
      .groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
      .count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
      .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
      .toStream()
      . print(Printed.toSysOut());
Run Code Online (Sandbox Code Playgroud)