Kafka MirrorMaker 的消费者没有从主题中获取所有消息

use*_*708 4 scala apache-kafka

我正在尝试设置 Kafka Mirror 机制,但似乎来自源 Kafka 集群的 Kafka MirrorMaker 的使用者仅在镜像制造商进程启动后才从新传入的数据读取到主题,即它不会读取历史上保存的数据之前的话题。

我为此使用 Kafka MirrorMaker 类:

/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.config --num.streams 2 --producer.config producer.config --whitelist=".*"

consumer.config 从 Kafka 源集群读取,如:

zookeeper.connect=127.0.0.1:2181

zookeeper.connection.timeout.ms=6000

group.id=kafka-mirror

以及producer.config生成新 Kafka 镜像集群的设置:

metadata.broker.list=localhost:9093

producer.type=sync

compression.codec=none

serializer.class=kafka.serializer.DefaultEncoder

有没有办法定义 Kafka MirrorMaker 的消费者从我的源 Kafka 集群的主题开始读取?有点奇怪,因为我在consumer.config设置中定义了一个新的消费者组 ( kafka-mirror),所以消费者应该只是从offset 0,即从主题的开头读取。

提前谢谢了!

小智 8

在消费者属性中,添加

auto.offset.reset=earliest

这应该工作


cod*_*tsu 6

查看auto.offset.resetKafka消费者配置中的参数。

来自 Kafka 文档:

auto.offset.reset 最大

当 Zookeeper 中没有初始偏移量或偏移量超出范围时该怎么办: * 最小:自动将偏移量重置为最小偏移量 * 最大:自动将偏移量重置为最大偏移量 * 其他:将异常抛出到消费者。如果将其设置为最大,则当代理上订阅的主题的分区数量发生变化时,消费者可能会丢失一些消息。为了防止在分区添加过程中数据丢失,请将 auto.offset.reset 设置为最小

因此,使用smallestforauto.offset.reset应该可以解决您的问题。