是否可以使用 MirrorMaker2 复制没有别名前缀的 kafka 主题

Bre*_*ion 8 apache-kafka apache-kafka-mirrormaker

我正在尝试在 2 个集群之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我很确定这可以完成,但还没有找到正确的配置来改变它

我当前的配置“mirrormaker2.properties”

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
Run Code Online (Sandbox Code Playgroud)

以供参考:

小智 15

从Kafka 3.0.0开始,设置就足够了

replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
Run Code Online (Sandbox Code Playgroud)

此外,marcin-wieloch 的答案/sf/answers/4243346341/中的 PrefixlessReplicationPolicy不再适用于 3.0.0 (NullPointerException)。


Mar*_*och 11

要“禁用”主题前缀并同时正确镜像主题属性,我必须提供自定义的复制策略,该策略也覆盖该topicSource方法。否则"cleanup.policy=compact",即使在重新启动镜像生成器之后,也不会镜像非默认主题属性(例如,)。

这是对我有用的完整程序:

  1. 将以下自定义复制策略编译并打包到 .jar 文件中(完整源代码可在此处找到):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}
Run Code Online (Sandbox Code Playgroud)
  1. 将 .jar 复制到${KAFKA_HOME/libs目录中
  2. 通过replication.policy.class${KAFKA_HOME}/config/mm2.properties以下位置设置属性,将 Mirror Maker 2 配置为使用该复制策略:
  replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
Run Code Online (Sandbox Code Playgroud)

  • 未来看来,这将不再是必要的。截至 2021 年 7 月,IdentityReplicationPolicy 已添加到 MirrorMaker2。它似乎还不是发布的一部分。https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java (4认同)
  • @Naremy,您好,“其他方法”是指“topicSource”和“upstreamTopic”吗?当我保留这些方法不变时,主题属性的镜像不起作用。例如,在我的 Kafka 设置中,默认情况下主题不会被压缩;当我创建一个想要压缩的主题时(因此我为该主题设置了“cleanup.policy=compact”属性),下游侧的镜像主题仍然具有默认属性(即不压缩)。主题属性的镜像适用于您的解决方案吗? (2认同)

小智 8

我能够使用此设置删除前缀:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",
Run Code Online (Sandbox Code Playgroud)

如果您的情况需要别名设置,我知道您应该使用其他 replicationPolicy 类。默认使用 DefaultReplicationPolicy 类(https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html


小智 6

我认为上面的回答不恰当。

在Mirror Maker 2.0中,如果你想保持主题不被修改,你必须实现ReplicationPolicy。

您可以参考 DefaultReplicationPolicy.class ,然后覆盖formatRemoteTopic(),之后您必须删除sourceClusterAlias + separator。最后在配置replication.policy.classmm2.properties

我定义了MigrationReplicationPolicy.class

replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy
Run Code Online (Sandbox Code Playgroud)

你应该看看MirrorClientConfig,class,我知道你会理解的

  • 您可以发布您创建的自定义“MigrationReplicationPolicy.class”的链接吗? (3认同)