我使用的是 flink 版本 1.8.0 。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免重新启动期间出现任何重复,我想使用带有 Exactly Once 语义的 kafka 生产者,请在此处阅读:
我的卡夫卡版本是 1.1 。
return new FlinkKafkaProducer<String>( topic, new KeyedSerializationSchema<String>() {
public byte[] serializeKey(String element) {
// TODO Auto-generated method stub
return element.getBytes();
}
public byte[] serializeValue(String element) {
// TODO Auto-generated method stub
return element.getBytes();
}
public String getTargetTopic(String element) {
// TODO Auto-generated method stub
return topic;
}
},prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);
Run Code Online (Sandbox Code Playgroud)
检查点代码:
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointTimeout(15 * 1000 ); …Run Code Online (Sandbox Code Playgroud)