我们使用MirrorMaker2将一些主题从一个 Kerberized kafka 集群复制到另一个 kafka 集群(严格单向)。我们不控制源 kafka 集群,我们只能访问描述和读取要使用的特定主题。
MirrorMaker2 在源集群中创建并维护一个主题 ( mm2-offset-syncs),以对要复制的每个主题分区的集群到集群偏移映射进行编码,并且还在AdminClient源集群中创建一个主题来处理 ACL/Config 传播。因为 MM2 需要授权才能在源集群中创建和写入这些主题,或者通过 执行操作AdminClient,所以我试图了解为什么/是否需要在我们的场景中使用这些机制。
我的问题是:
AdminClient不用于其他任何用途?在 MirrorMaker 代码中,偏移同步主题很容易在启动时创建MirrorSourceConnector,然后由MirrorSourceTask. 同样的情况也发生在MirrorSourceConnector.
我没有找到关闭这些功能的方法,但老实说,我可能遗漏了一些想法。
我正在尝试在 2 个集群之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我很确定这可以完成,但还没有找到正确的配置来改变它
我当前的配置“mirrormaker2.properties”
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = cluster1, cluster2
# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092
# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
Run Code Online (Sandbox Code Playgroud)
以供参考:
我已经设置了 MirrorMaker2 来在 2 个 DC 之间复制数据。
我的 mm2.properties,
# mm2.properties
name=source->dest
clusters=source, dest
source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092
source->dest.enabled=true
offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1
Run Code Online (Sandbox Code Playgroud)
在MM2启动时看到以下内容。
[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values:
admin.timeout.ms = 60000
checkpoints.topic.replication.factor = 3
config.action.reload = restart
config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
consumer.poll.timeout.ms = 1000
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
enabled = true
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0 …Run Code Online (Sandbox Code Playgroud) 我正在使用 MirrorMaker2 进行灾难恢复。
Kafka 2.7 应支持 自动消费者偏移同步
这是我正在使用的 yaml 文件(我使用 strimzi 来创建它)
所有源集群主题都会复制到目标集群中。另外...checkpoint.internal主题是在目标集群中创建的,其中包含同步的所有源集群偏移量,但我没有看到这些偏移量被转换为目标集群_consumer_offsets主题,这意味着我何时将在目标中启动消费者(同一消费者组) cluster 它将从头开始读取消息。
我的期望是,在允许自动消费者偏移量后,同步来自翻译并存储在目标集群中的 _consumer_offsets 主题中的源集群的所有消费者偏移量。
有人可以澄清我的期望是否正确,如果不正确,它应该如何运作。
我们正在使用镜像生成器来同步本地和 AWS Kafka 主题。如何在其他集群(本例中为 AWS)中以完全相同的方式复制在本地注册的架构的主题?如何使用镜像制作器复制 Avro 架构?
apache-kafka confluent-schema-registry apache-kafka-mirrormaker
我正在尝试使用 MirrorMaker 2.0 复制 Kafka 集群。我正在使用以下 mm2.properties:
name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2
# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2
site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<someuser>" \
password="<somepass>";
site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<someuser>" \
password="<somepass>";
site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093
site1->site2.enabled = true
site1->site2.topics = topic1
# use ByteArrayConverter to ensure that …Run Code Online (Sandbox Code Playgroud) Apache Kafka随着 Kafka 2.4 的发布引入了Mirrormaker2 (MM2)。MM2明显优于MM1。
我知道从架构的角度来看,MM1 过去使用生产者和消费者 API 工作,而 MM2 使用连接 API。我相信MM2的设计灵感来自于Confluence Replicator。Confluence Replicator 与 Confluence 工具完美集成。但除此之外,MM2 和 Confluence Replicator 之间有什么区别?
可以说我有两个Kafka集群,我正在使用镜像制作器将主题从一个集群镜像到另一个集群。我了解消费者有一个嵌入式生产者来向__consumer-offsetKafka集群中的主题提交偏移量。我需要知道如果主要的Kafka群集出现故障会怎样?我们是否也同步__consumer-offset主题?我认为,由于辅助群集可能具有不同数量的代理和其他设置。
请告诉我们Kafka镜像集群如何处理消费者补偿?
是否auto.offset.reset设置在这里发挥作用?
我从最初的 Kafka MirrorMaker 迁移到MirrorMaker 2.0,以便将主题从一个集群复制到另一个集群。我正在运行一个专用的 MirrorMaker 集群,如文档中所述。
假设我正在复制一个名为 的主题test-topic。
Cluster A Cluster B
---------- ----------
test-topic ---> A.test-topic
Run Code Online (Sandbox Code Playgroud)
如何确定A.test-topic落后了多远test-topic?
最初的 MirrorMaker 创建了消费者组,所以我提到了该消费者组的滞后。MirrorMaker 2.0 不会创建消费者组,因此我无法使用它来确定延迟。
我在 docker 容器中运行 mirrormaker,当我运行 mirrormaker 时,我得到以下错误。
[2019-09-27 14:38:14,279] ERROR [mirrormaker-thread-0] 镜像制造商线程异常退出,停止整个镜像制造商。(kafka.tools.MirrorMaker$MirrorMakerThread) [2019-09-27 14:38:14,280] ERROR [mirrormaker-thread-1] Mirror Maker 线程异常退出,停止整个镜像制造商。(kafka.tools.MirrorMaker$MirrorMakerThread)
有人可以帮我解决这个问题。
谢谢
我已设置 MirrorMaker2 来复制 2 个 Apache Kafka 集群。一切似乎都进展顺利。我唯一的问题是,当从CLUSTER1中删除主题(在 cluster2 上复制为CLUSTER1.topic)时,cluster2 上的复制主题CLUSTER1.topic不会被删除。因此,主题的删除似乎不会复制到另一个集群.mm2.properties 上有一些特定的属性可以控制主题的复制删除吗?
谢谢