小编Sri*_*r S的帖子

如何在apache kafka连接器中实现一次语义

我使用的是 flink 版本 1.8.0 。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免重新启动期间出现任何重复,我想使用带有 Exactly Once 语义的 kafka 生产者,请在此处阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

我的卡夫卡版本是 1.1 。

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


            public byte[] serializeKey(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public byte[] serializeValue(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public String getTargetTopic(String element) {
                // TODO Auto-generated method stub
                return topic;
            }
        },prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);
Run Code Online (Sandbox Code Playgroud)

检查点代码:

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointTimeout(15 * 1000 ); …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

3
推荐指数
1
解决办法
1446
查看次数

标签 统计

apache-flink ×1

flink-streaming ×1