Kafka S3 Connector 一次交付保证如何工作

I a*_*rge 0 amazon-s3 apache-kafka apache-kafka-connect confluent-platform

我已经阅读了他们的博客并理解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/

但我正在努力解决我所拥有的这种情况。我目前的配置是:

"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
Run Code Online (Sandbox Code Playgroud)

根据我对配置的了解。连接器将50300000ms(5 分钟)后提交记录文件或文件,以先到者为准。如果连接器将文件上传到 s3 但未能提交到 Kafka,由于我设置了轮换计划间隔,Kafka 如何重新上传将覆盖 s3 文件的相同记录?这不会导致 s3 重复吗?

Ran*_*uch 7

S3信宿连接的文档是描述连接器如何能保证正好一次传递到S3,更重要的是它的特征组合提供(或不提供),其担保的另一个很好的资源。

具体来说,该文件中的一个部分说:

为了保证与 的完全一次语义TimeBasedPartitioner,连接器必须配置为使用 的确定性实现TimestampExtractor和确定性旋转策略。确定性时间戳提取器是 Kafka 记录 ( timestamp.extractor=Record) 或记录字段 ( timestamp.extractor=RecordField)。确定性轮换策略配置是rotate.interval.ms(设置rotate.schedule.interval.ms是非确定性的,并且会使恰好一次的保证无效)。

您的 S3 接收器连接器配置确实使用了确定性分区器(通过“partitioner.class”:“io.confluent.connect.storage.partitioner.TimeBasedPartitioner”),但它使用了非确定性 Wallclock 时间戳提取器(通过"timestamp.extractor": "Wallclock")。这是不确定的,因为如果连接器确实必须重新启动(例如,由于故障)并重新处理特定记录,它将在稍后重新处理该记录,并且挂钟时间戳提取器将为该记录选择不同的时间。

其次,您的连接器使用该rotate.schedule.interval.ms选项,文档指出该选项与一次交付不兼容。例如,如果连接器确实必须重新处理一系列 Kafka 记录,它可能会将记录分解为与第一次不同的 S3 对象,这意味着 S3 连接器最终会写入不同的 S3 对象。

总之,具有您的配置的 S3 接收器连接器不会提供完全一次交付保证。