我们如何强制融合 kafka 连接 s3 接收器进行冲洗

Xia*_*ang 4 amazon-s3 apache-kafka apache-kafka-connect confluent-platform

我设置了 kafka 连接 s3 接收器,持续时间设置为 1 小时,并且我设置了一个相当大的刷新计数,比如 10,000。现在如果kafka通道中的消息不多,s3 sink会尝试将它们缓存在内存中,等待它累积到flush计数,然后将它们一起上传并将偏移量提交给自己的消费者组。

但是想想这种情况。如果在频道里,我只发送5000条消息。然后没有 s3 水槽冲洗。然后时间长了,这5000条消息最终会因为保留时间的原因从kafka中驱逐出去。但是这些消息仍然在 s3 sink 的内存中,而不是在 s3 中。这是非常危险的,例如,如果我们重新启动 s3 sink 或运行 s3 sink 的机器就崩溃了。然后我们丢失了那 5,000 条消息。我们无法从 kafka 中再次找到它们,因为它已经被删除了。

这会发生在 s3 sink 上吗?或者有一些设置会强制它在一段时间后刷新?

Kon*_*sis 5

如果您从 Kafka 到 S3 的流没有恒定的记录流,您可以使用该属性

旋转.schedule.interval.ms

以预定的时间间隔刷新记录。

请注意,在重新处理的情况下,如果使用此选项,您的下游系统应该能够处理重复项。这是因为如果连接器计划从 Kafka 重新导出记录,则基于挂钟刷新此类记录可能会导致重复出现在不同的文件中。

作为旁注,如果您使用财产:

旋转.间隔.ms

使用Wallclock时间戳提取器 ( timestamp.extractor=Wallclock),您的记录将在不设置rotate.schedule.interval.ms. 但这意味着您的分区器依赖于挂钟,因此您应该能够考虑重复记录。

连接器能够在具有确定性分区器的恒定记录流上提供一次性交付,并具有各种时间戳提取器,例如依赖于记录的时间戳 ( Record) 或字段时间戳 ( RecordField) 的提取器。

此处分区的配置属性