Bre*_*ion 8 apache-kafka apache-kafka-mirrormaker
我正在尝试在 2 个集群之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我很确定这可以完成,但还没有找到正确的配置来改变它
我当前的配置“mirrormaker2.properties”
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = cluster1, cluster2
# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092
# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
Run Code Online (Sandbox Code Playgroud)
以供参考:
小智 15
从Kafka 3.0.0开始,设置就足够了
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
Run Code Online (Sandbox Code Playgroud)
此外,marcin-wieloch 的答案/sf/answers/4243346341/中的 PrefixlessReplicationPolicy不再适用于 3.0.0 (NullPointerException)。
Mar*_*och 11
要“禁用”主题前缀并同时正确镜像主题属性,我必须提供自定义的复制策略,该策略也覆盖该topicSource方法。否则"cleanup.policy=compact",即使在重新启动镜像生成器之后,也不会镜像非默认主题属性(例如,)。
这是对我有用的完整程序:
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
super.configure(props);
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
if (sourceClusterAlias == null) {
String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
log.error(logMessage);
throw new RuntimeException(logMessage);
}
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return topic;
}
@Override
public String topicSource(String topic) {
return topic == null ? null : sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
${KAFKA_HOME/libs目录中replication.policy.class在${KAFKA_HOME}/config/mm2.properties以下位置设置属性,将 Mirror Maker 2 配置为使用该复制策略: replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
Run Code Online (Sandbox Code Playgroud)
小智 8
我能够使用此设置删除前缀:
"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",
Run Code Online (Sandbox Code Playgroud)
如果您的情况需要别名设置,我知道您应该使用其他 replicationPolicy 类。默认使用 DefaultReplicationPolicy 类(https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html)
小智 6
我认为上面的回答不恰当。
在Mirror Maker 2.0中,如果你想保持主题不被修改,你必须实现ReplicationPolicy。
您可以参考 DefaultReplicationPolicy.class ,然后覆盖formatRemoteTopic(),之后您必须删除sourceClusterAlias + separator。最后在配置replication.policy.class中mm2.properties
我定义了MigrationReplicationPolicy.class
replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy
Run Code Online (Sandbox Code Playgroud)
你应该看看MirrorClientConfig,class,我知道你会理解的
| 归档时间: |
|
| 查看次数: |
3876 次 |
| 最近记录: |