Kafka-connect sink任务忽略文件偏移存储属性

bsi*_*nau 11 java apache-kafka apache-kafka-connect

使用Confluent JDBC连接器时,我遇到了很奇怪的行为.我很确定它与Confluent堆栈无关,而是与Kafka-connect框架本身无关.

因此,我将offset.storage.file.filename属性定义为默认值/tmp/connect.offsets并运行我的接收器连接器.显然,我希望连接器在给定文件中保持偏移量(它在文件系统中不存在,但它应该自动创建,对吧?).文件说:

offset.storage.file.filename 要存储连接器偏移量的文件.通过在磁盘上存储偏移量,可以在单个节点上停止并启动独立进程,并从之前停止的位置继续.

但卡夫卡表现得完全不同.

  1. 它检查给定文件是否存在.
  2. 事实并非如此,卡夫卡只是忽略了它,并在卡夫卡主题中持续抵消.
  3. 如果我手动创建给定文件,则无论如何都会读取失败(EOFException),并且主题中的偏移将再次保留.

这是一个错误,或者更可能的是,我不明白如何使用这种配置?我理解两种保持偏移和文件存储的方法之间的区别对我的需求更方便.

G Q*_*ana 7

offset.storage.file.filename仅在使用的连接器。它用于在输入数据源上放置书签并记住它停止读取它的位置。创建的文件包含诸如文件行号(对于文件源)或表行号(对于 jdbc 源或一般数据库)之类的内容。

在分布式模式下运行 Kafka Connect 时,此文件将替换为默认命名的 Kafka 主题,该主题connect-offsets应被复制以容忍故障。

就接收连接器而言,无论使用哪种插件或模式(独立/分布式),它们都将上次停止读取输入主题的位置存储在一个名为__consumer_offsets任何 Kafka 消费者的内部主题中。这允许使用诸如kafka-consumer-groups.sh命令行工具之类的传统工具来处理接收器连接器滞后的程度。

汇合卡夫卡复制,尽管是一个源连接器,可能是一个例外,因为它从一个远程卡夫卡读取并可以使用卡夫卡消费者。

我同意文档不清楚,无论连接器类型是什么(源或接收器),都需要此设置,但它仅用于源连接器。这个设计决定背后的原因是单个 Kafka Connect 工作器(我的意思是单个 JVM 进程)可以运行多个连接器,可能是源连接器和接收器连接器。换句话说,这个设置是工人级别的设置,而不是连接器设置。


daw*_*saw 1

该属性offset.storage.file.filename仅适用于以独立模式运行的源连接器的工作线程。如果您看到 Kafka 在源的 Kafka 主题中保留偏移量,则说明您正在以分布式模式运行。您应该使用提供的脚本启动连接器connect-standalone这里有不同模式的描述。有关在不同模式下运行的说明请参见此处

  • Sink 连接器使用 Kafka 消费者组功能来存储偏移量。连接的偏移文件或主题实际上仅适用于一般源连接器。 (5认同)