Piy*_*ava 3 apache-kafka apache-flink
我在用Scala编写的Apache Flink API中创建了一个Kafka使用者.每当我从某个主题传递一些消息时,它就会接收它们.但是,当我重新启动使用者时,它会消耗发送到该主题的最新消息,而不是接收新消息或未消息消息.
这就是我在做的事情:
运行生产者:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
Run Code Online (Sandbox Code Playgroud)运行消费者:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
env.enableCheckpointing(5000)
st.print()
env.execute()
Run Code Online (Sandbox Code Playgroud)传递一些消息
您正在运行Kafka使用者,检查点间隔为5秒.因此,每隔5秒,Flink就会创建一个运营商状态(偏移量)的副本以进行恢复.
一旦检查点完成,它将让操作员知道检查点已完成.在该通知上,Kafka消费者向Zookeeper提交抵消.所以大约每5秒钟,我们将最后一个检查点的偏移写入ZK.
再次启动Flink作业时,它将在ZK中找到偏移并从那里继续.根据时间,将再次发送提交到ZK后收到的所有消息.
您无法避免此行为,因为.print()"运算符"不是检查点的一部分.它意味着作为调试实用程序.但是,参与检查点的数据接收器(例如滚动文件接收器)将确保不会将重复项写入文件系统.
| 归档时间: |
|
| 查看次数: |
1139 次 |
| 最近记录: |