当我重新运行Flink消费者时,Kafka再次消费最新消息

Piy*_*ava 3 apache-kafka apache-flink

我在用Scala编写的Apache Flink API中创建了一个Kafka使用者.每当我从某个主题传递一些消息时,它就会接收它们.但是,当我重新启动使用者时,它会消耗发送到该主题的最新消息,而不是接收新消息或未消息消息.

这就是我在做的事情:

  1. 运行生产者:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
    Run Code Online (Sandbox Code Playgroud)
  2. 运行消费者:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
    Run Code Online (Sandbox Code Playgroud)
  3. 传递一些消息

  4. 阻止消费者
  5. 再次运行使用者打印我发送的最后一条消息.我希望它只打印新消息.

Rob*_*ger 6

您正在运行Kafka使用者,检查点间隔为5秒.因此,每隔5秒,Flink就会创建一个运营商状态(偏移量)的副本以进行恢复.

一旦检查点完成,它将让操作员知道检查点已完成.在该通知上,Kafka消费者向Zookeeper提交抵消.所以大约每5秒钟,我们将最后一个检查点的偏移写入ZK.

再次启动Flink作业时,它将在ZK中找到偏移并从那里继续.根据时间,将再次发送提交到ZK后收到的所有消息.

您无法避免此行为,因为.print()"运算符"不是检查点的一部分.它意味着作为调试实用程序.但是,参与检查点的数据接收器(例如滚动文件接收器)将确保不会将重复项写入文件系统.