小编Bag*_*dus的帖子

从FlinkKafkaConsumer迁移到KafkaSource,没有执行任何窗口

我已经实现了FlinkKafkaConsumer消费来自卡夫卡主题的消息。除“组”和“主题”之外的唯一自定义设置是(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")允许多次重新阅读相同的消息。它开箱即用,适合消费和逻辑。

现在FlinkKafkaConsumer弃用,我想更新到它的后继者KafkaSource

KafkaSource使用与我相同的参数进行初始化会FlinkKafkaConsumer按预期生成主题的读取,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。然而,窗口的执行尚未完成,因此不会产生任何结果。

我假设 中的某些默认设置KafkaSource与 的不同FlinkKafkaConsumer,但我不知道它们可能是什么。

KafkaSource -不工作

KafkaSource<TestData> source = KafkaSource.<TestData>builder()
     .setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
     .setTopics(TOPIC)
     .setDeserializer(new CustomDeserializer())
     .setGroupId(GROUP_ID)
     .setStartingOffsets(OffsetsInitializer.earliest())
     .build();

DataStream<TestData> testDataStreamSource = env.fromSource(
     source,
     WatermarkStrategy. <
     TestData > noWatermarks(),
     "Kafka Source"
 );
Run Code Online (Sandbox Code Playgroud)

卡夫卡消费者 -工作

(属性包含group.idbootstrap.serverszookeeper.connect

propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
Run Code Online (Sandbox Code Playgroud)

两个流都使用相同的管道,如下所示

testDataStreamSource
    .assignTimestampsAndWatermarks(WatermarkStrategy.<TestData>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-flink flink-streaming

5
推荐指数
1
解决办法
5121
查看次数