Jho*_*ith 3 streaming blink apache-flink flink-streaming
我正在写一个案例来测试 flink 两步提交,下面是概述。
sink kafka曾经是kafka生产者。sink step是 mysql 接收器扩展two step commit。sink compare是mysql的sinkextend two step commit,这个sink偶尔会抛出异常来模拟检查点失败。
当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka消费者将读取上次成功的偏移量,并且kafka生产者会产生消息,即使他在这个检查点失败之前已经完成了。
在这种情况下如何避免重复消息?
感谢帮助。
环境:
弗林克1.9.1
爪哇1.8
卡夫卡2.11
卡夫卡生产者代码:
dataStreamReduce.addSink(new FlinkKafkaProducer<>(
"flink_output",
new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
UUID uuid = UUID.randomUUID();
JSONObject jsonObject = new JSONObject();
jsonObject.put("uuid", uuid.toString());
jsonObject.put("key1", element.f0);
jsonObject.put("key2", element.f1);
jsonObject.put("key3", element.f2);
jsonObject.put("indicate", element.f3);
return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
}
},
kafkaProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)).name("sink kafka");
Run Code Online (Sandbox Code Playgroud)
检查点设置:
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.enableCheckpointing(10000);
executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
executionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Run Code Online (Sandbox Code Playgroud)
mysql 接收器:
dataStreamReduce.addSink(
new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
Connection, Void>
(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {
int count = 0;
Connection connection;
@Override
protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
if (count > 10) {
throw new Exception("compare test exception.");
}
PreparedStatement ps = transaction.prepareStatement(
" insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
" values(?, ?, ?, ?, ?) " +
" ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
);
ps.setString(1, context.timestamp().toString());
ps.setString(2, value.f0);
ps.setString(3, value.f1);
ps.setString(4, value.f1);
ps.setLong(5, value.f3);
ps.execute();
ps.close();
count += 1;
}
@Override
protected Connection beginTransaction() throws Exception {
LOGGER.error("compare in begin transaction");
try {
if (connection.isClosed()) {
throw new Exception("mysql connection closed");
}
}catch (Exception e) {
LOGGER.error("mysql connection is error: " + e.toString());
LOGGER.error("reconnect mysql connection");
String jdbcURI = "jdbc:mysql://";
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection(jdbcURI);
connection.setAutoCommit(false);
this.connection = connection;
}
return this.connection;
}
@Override
protected void preCommit(Connection transaction) throws Exception {
LOGGER.error("compare in pre Commit");
}
@Override
protected void commit(Connection transaction) {
LOGGER.error("compare in commit");
try {
transaction.commit();
} catch (Exception e) {
LOGGER.error("compare Commit error: " + e.toString());
}
}
@Override
protected void abort(Connection transaction) {
LOGGER.error("compare in abort");
try {
transaction.rollback();
} catch (Exception e) {
LOGGER.error("compare abort error." + e.toString());
}
}
@Override
protected void recoverAndCommit(Connection transaction) {
super.recoverAndCommit(transaction);
LOGGER.error("compare in recover And Commit");
}
@Override
protected void recoverAndAbort(Connection transaction) {
super.recoverAndAbort(transaction);
LOGGER.error("compare in recover And Abort");
}
})
.setParallelism(1).name("sink compare");
Run Code Online (Sandbox Code Playgroud)
我不太确定我是否正确理解了这个问题:
当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka生产者将读取上次成功的偏移量并生成消息,即使他在这个检查点失败之前已经完成了。
Kafka 生产者没有读取任何数据。因此,我假设您的整个管道重新读取旧的偏移量并产生重复项。如果是这样,你需要了解 Flink 如何确保恰好一次。
对于最后一点,有两种选择:
后一个选项用于 Kafka 接收器。它使用 Kafka 事务来删除重复数据。为了避免消费者端出现重复,您需要确保它不会读取文档中提到的未提交的数据。还要确保事务超时足够大,以免在故障和恢复之间丢弃数据。
| 归档时间: |
|
| 查看次数: |
1346 次 |
| 最近记录: |