Bag*_*dus 5 apache-kafka apache-flink flink-streaming
我已经实现了FlinkKafkaConsumer
消费来自卡夫卡主题的消息。除“组”和“主题”之外的唯一自定义设置是(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
允许多次重新阅读相同的消息。它开箱即用,适合消费和逻辑。
现在FlinkKafkaConsumer
已弃用,我想更新到它的后继者KafkaSource
。
KafkaSource
使用与我相同的参数进行初始化会FlinkKafkaConsumer
按预期生成主题的读取,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。然而,窗口的执行尚未完成,因此不会产生任何结果。
我假设 中的某些默认设置KafkaSource
与 的不同FlinkKafkaConsumer
,但我不知道它们可能是什么。
KafkaSource<TestData> source = KafkaSource.<TestData>builder()
.setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
.setTopics(TOPIC)
.setDeserializer(new CustomDeserializer())
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<TestData> testDataStreamSource = env.fromSource(
source,
WatermarkStrategy. <
TestData > noWatermarks(),
"Kafka Source"
);
Run Code Online (Sandbox Code Playgroud)
(属性包含group.id
、bootstrap.servers
和zookeeper.connect
)
propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
Run Code Online (Sandbox Code Playgroud)
两个流都使用相同的管道,如下所示
testDataStreamSource
.assignTimestampsAndWatermarks(WatermarkStrategy.<TestData>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - > event.getTimestamp()))
.keyBy(TestData::getKey)
.window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
.process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
@Override
public void process(
....
});
Run Code Online (Sandbox Code Playgroud)
尝试过的事情
更新:答案是,当 Kafka 分区数量小于 Flink 的 kafka 源算子的并行度时,KafkaSource 的行为与 FlinkKafkaConsumer 不同。有关详细信息,请参阅/sf/answers/4907090331/。
原答案:
问题几乎肯定与时间戳和水印有关。
要验证时间戳和水印是否是问题所在,您可以进行一个快速实验,将 3 小时长的事件时间滑动窗口替换为短处理时间的滚动窗口。
一般来说,最好(但不是必需)让 KafkaSource 执行水印。forMonotonousTimestamps
正如您现在所做的那样,在源之后应用的水印生成器中使用是一个冒险的举动。仅当源的每个并行实例所使用的所有分区中的时间戳均按顺序处理时,此操作才能正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您forMonotonousTimestamps
在 fromSource 调用(而不是noWatermarks
)中提供水印策略,那么所需要的只是时间戳在每个分区的基础上按顺序排列,我想就是这种情况。
尽管这很令人不安,但它可能不足以解释为什么窗口不产生任何结果。另一个可能的根本原因是测试数据集不包含第一个窗口之后带有时间戳的任何事件,因此该窗口永远不会关闭。
你有水槽吗?如果没有,那就可以解释事情了。
您可以使用 Flink 仪表板来帮助调试。查看水印是否在窗口任务中前进。打开检查点,然后查看窗口任务有多少状态——它应该有一些非零状态量。
归档时间: |
|
查看次数: |
5121 次 |
最近记录: |