如何更改Kafka Connect Source Connector生成的主题名称

Abh*_*ece 5 apache-kafka apache-kafka-connect mongodb-kafka-connector

我有一个已经在运行的生产部署了 Kafka-Cluster 并且有主题“现有主题”。我正在使用 Debezium 的 MongoDB-Source-Connector。

在这里,我想要的只是将 CDC 事件直接推送到主题“现有主题”,以便我的已经在收听该主题的消费者能够处理它。

我没有找到任何资源可以这样做,但是有人提到该主题是以以下格式创建的 -

“如果你的mongodb.name参数是A,数据库名称是B,集合名称是C,那么数据库A和集合C的数据将被加载到主题ABC下”

我可以将主题更改为“现有主题”并将事件推送给它吗?

Gio*_*ous 4

根据文档

Kafka 主题的名称始终采用 形式 logicalName.databaseName.collectionName,其中logicalName是 使用 配置属性指定的连接器的逻辑名称,是发生操作的数据库的名称,是受影响文档所在的 MongoDB 集合的名称存在。mongodb.namedatabaseNamecollectionName


这意味着如果您的连接器的逻辑名称是myConnector并且您的数据库myDatabase有两个集合users并且orders

{
  "name": "myConnector",  
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongo-db-host:27017", 
    "mongodb.name": "myDatabase", 
    "collection.whitelist": "myDatabase[.]*", 
  }
}
Run Code Online (Sandbox Code Playgroud)

然后 Kafka Connect 将使用名称填充两个主题:

  • myConnector.myDatabase.users
  • myConnector.myDatabase.orders

现在,如果您仍然想更改目标主题的名称,可以使用 Kafka Connect 单消息转换 (SMT)。更准确地说,ExtractTopic应该对你有帮助。请注意,尽管此 SMT 可以帮助您从消息的键或值中提取主题名称,因此您需要在负载中包含所需的主题名称。

例如,以下 SMT 将提取字段的值myField并将其用作记录的主题:

 transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
 transforms.ValueFieldExample.field=myField
Run Code Online (Sandbox Code Playgroud)