我已经实现了FlinkKafkaConsumer消费来自卡夫卡主题的消息。除“组”和“主题”之外的唯一自定义设置是(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")允许多次重新阅读相同的消息。它开箱即用,适合消费和逻辑。
现在FlinkKafkaConsumer已弃用,我想更新到它的后继者KafkaSource。
KafkaSource使用与我相同的参数进行初始化会FlinkKafkaConsumer按预期生成主题的读取,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。然而,窗口的执行尚未完成,因此不会产生任何结果。
我假设 中的某些默认设置KafkaSource与 的不同FlinkKafkaConsumer,但我不知道它们可能是什么。
KafkaSource<TestData> source = KafkaSource.<TestData>builder()
.setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
.setTopics(TOPIC)
.setDeserializer(new CustomDeserializer())
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<TestData> testDataStreamSource = env.fromSource(
source,
WatermarkStrategy. <
TestData > noWatermarks(),
"Kafka Source"
);
Run Code Online (Sandbox Code Playgroud)
(属性包含group.id、bootstrap.servers和zookeeper.connect)
propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
Run Code Online (Sandbox Code Playgroud)
两个流都使用相同的管道,如下所示
testDataStreamSource
.assignTimestampsAndWatermarks(WatermarkStrategy.<TestData>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - …Run Code Online (Sandbox Code Playgroud)