从FlinkKafkaConsumer迁移到KafkaSource,没有执行任何窗口

Bag*_*dus 5 apache-kafka apache-flink flink-streaming

我已经实现了FlinkKafkaConsumer消费来自卡夫卡主题的消息。除“组”和“主题”之外的唯一自定义设置是(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")允许多次重新阅读相同的消息。它开箱即用,适合消费和逻辑。

现在FlinkKafkaConsumer弃用,我想更新到它的后继者KafkaSource

KafkaSource使用与我相同的参数进行初始化会FlinkKafkaConsumer按预期生成主题的读取,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。然而,窗口的执行尚未完成,因此不会产生任何结果。

我假设 中的某些默认设置KafkaSource与 的不同FlinkKafkaConsumer,但我不知道它们可能是什么。

KafkaSource -不工作

KafkaSource<TestData> source = KafkaSource.<TestData>builder()
     .setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
     .setTopics(TOPIC)
     .setDeserializer(new CustomDeserializer())
     .setGroupId(GROUP_ID)
     .setStartingOffsets(OffsetsInitializer.earliest())
     .build();

DataStream<TestData> testDataStreamSource = env.fromSource(
     source,
     WatermarkStrategy. <
     TestData > noWatermarks(),
     "Kafka Source"
 );
Run Code Online (Sandbox Code Playgroud)

卡夫卡消费者 -工作

(属性包含group.idbootstrap.serverszookeeper.connect

propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
Run Code Online (Sandbox Code Playgroud)

两个流都使用相同的管道,如下所示

testDataStreamSource
    .assignTimestampsAndWatermarks(WatermarkStrategy.<TestData>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - > event.getTimestamp()))
    .keyBy(TestData::getKey)
    .window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
    .process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
            @Override
            public void process(
                ....
            });
Run Code Online (Sandbox Code Playgroud)

尝试过的事情

  • 我尝试过设置偏移量的提交,但并没有改善情况。
  • 设置源中已有的时间戳。

Dav*_*son 3

更新:答案是,当 Kafka 分区数量小于 Flink 的 kafka 源算子的并行度时,KafkaSource 的行为与 FlinkKafkaConsumer 不同。有关详细信息,请参阅/sf/answers/4907090331/

原答案:

问题几乎肯定与时间戳和水印有关。

要验证时间戳和水印是否是问题所在,您可以进行一个快速实验,将 3 小时长的事件时间滑动窗口替换为短处理时间的滚动窗口。

一般来说,最好(但不是必需)让 KafkaSource 执行水印。forMonotonousTimestamps正如您现在所做的那样,在源之后应用的水印生成器中使用是一个冒险的举动。仅当源的每个并行实例所使用的所有分区中的时间戳均按顺序处理时,此操作才能正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您forMonotonousTimestamps在 fromSource 调用(而不是noWatermarks)中提供水印策略,那么所需要的只是时间戳在每个分区的基础上按顺序排列,我想就是这种情况。

尽管这很令人不安,但它可能不足以解释为什么窗口不产生任何结果。另一个可能的根本原因是测试数据集不包含第一个窗口之后带有时间戳的任何事件,因此该窗口永远不会关闭。

你有水槽吗?如果没有,那就可以解释事情了。

您可以使用 Flink 仪表板来帮助调试。查看水印是否在窗口任务中前进。打开检查点,然后查看窗口任务有多少状态——它应该有一些非零状态量。