Phi*_*ann 2 apache-kafka apache-kafka-connect
我正在设置一个 Kafka Connect 分布式模式应用程序,它将是一个 Kafka 到 S3 的管道。我正在使用 Kafka 0.10.1.0-1 和 Kafka Connect 3.1.1-1。到目前为止,事情进展顺利,但对我正在使用的更大系统很重要的一个方面需要知道 Kafka -> FileSystem 管道的偏移信息。根据文档,offset.storage.topic配置将是分布式模式应用程序用于存储偏移信息的位置。考虑到 Kafka 如何在“新”Kafka 中存储消费者偏移量,这是有道理的。但是,在对 FileStreamSinkConnector 进行了一些测试后,没有任何内容写入 myoffset.storage.topic默认值:connect-offsets。
具体来说,我使用 Python Kafka 生产者将数据推送到主题,并使用 Kafka Connect 和 FileStreamSinkConnect 将数据从主题输出到文件。这可以正常工作并按照我期望的连接器行为运行。此外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态并且没有数据重复。但是,当我去offset.storage.topic查看存储了哪些偏移元数据时,主题中没有任何内容。
这是我使用的命令:
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning
让此命令运行一分钟左右后,我收到此消息:
Processed a total of 0 messages
总而言之,我有两个问题:
谢谢您的帮助。
Liju 是正确的,connect-offsets 用于跟踪源连接器(具有生产者但没有消费者)的偏移量。接收器连接器有一个消费者并以通常的方式跟踪偏移量 - __consumer_offsets 主题
查看最后提交的偏移量的最佳方法是使用消费者组工具:
bin/kafka-consumer-groups.sh --group connect-elastic-login-connector --bootstrap-server localhost:9092 --describe
组名始终是“connect-”和连接器名称(在我的情况下,elastic-login-connector)。这将显示该组提交的最新偏移量,这基本上确认了该偏移量之前的所有消息都已写入 Elastic。
| 归档时间: |
|
| 查看次数: |
3015 次 |
| 最近记录: |