如何在apache kafka连接器中实现一次语义

Sri*_*r S 3 apache-flink flink-streaming

我使用的是 flink 版本 1.8.0 。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免重新启动期间出现任何重复,我想使用带有 Exactly Once 语义的 kafka 生产者,请在此处阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

我的卡夫卡版本是 1.1 。

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


            public byte[] serializeKey(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public byte[] serializeValue(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public String getTargetTopic(String element) {
                // TODO Auto-generated method stub
                return topic;
            }
        },prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);
Run Code Online (Sandbox Code Playgroud)

检查点代码:

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointTimeout(15 * 1000 );
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.enableCheckpointing(5000 );
Run Code Online (Sandbox Code Playgroud)

如果我在 kafka Producer 中添加一次语义,我的 flink 消费者不会读取任何新数据。

任何人都可以分享任何示例代码/应用程序与 Exactly Once Semantics 吗?

请在这里找到完整的代码:

https://github.com/sris2/sample_flink_exactly_once

谢谢

Arv*_*ise 5

\n

任何人都可以分享任何示例代码/应用程序与 Exactly Once Semantics 吗?

\n
\n\n

恰好一次的示例隐藏在flink 的端到端测试中。由于它使用了一些方便的功能,因此如果不检查整个存储库,可能很难理解。

\n\n
\n

如果我在 kafka 生产者中添加一次语义,我的 flink 消费者\n 不会读取任何新数据。\n [...]\n 请在此处找到完整的代码:

\n\n

https://github.com/sris2/sample_flink_exactly_once

\n
\n\n

我检查了您的代码并发现了问题(必须修复整个设置/代码才能实际运行)。接收器实际上无法正确配置交易。正如Flink Kafka 连接器文档中所写,您需要将transaction.timeout.msKafka 代理中的时间调整为最多 1 小时,或将应用程序中的时间调整为 15 分钟:

\n\n
    prop.setProperty("transaction.timeout.ms", "900000");\n
Run Code Online (Sandbox Code Playgroud)\n\n

相应的摘录是:

\n\n
\n

默认情况下,Kafka 代理将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为生产者设置大于 it\xe2\x80\x99s 值的事务超时。FlinkKafkaProducer011默认将生产者配置中的transaction.timeout.ms属性设置为1小时,因此在使用Semantic.EXACTLY_ONCE模式之前应该增加transaction.max.timeout.ms。

\n
\n