Abh*_*ece 5 apache-kafka apache-kafka-connect mongodb-kafka-connector
我有一个已经在运行的生产部署了 Kafka-Cluster 并且有主题“现有主题”。我正在使用 Debezium 的 MongoDB-Source-Connector。
在这里,我想要的只是将 CDC 事件直接推送到主题“现有主题”,以便我的已经在收听该主题的消费者能够处理它。
我没有找到任何资源可以这样做,但是有人提到该主题是以以下格式创建的 -
“如果你的mongodb.name参数是A,数据库名称是B,集合名称是C,那么数据库A和集合C的数据将被加载到主题ABC下”
我可以将主题更改为“现有主题”并将事件推送给它吗?
根据文档,
Kafka 主题的名称始终采用 形式
logicalName.databaseName.collectionName
,其中logicalName
是 使用 配置属性指定的连接器的逻辑名称,是发生操作的数据库的名称,是受影响文档所在的 MongoDB 集合的名称存在。mongodb.name
databaseName
collectionName
这意味着如果您的连接器的逻辑名称是myConnector
并且您的数据库myDatabase
有两个集合users
并且orders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
Run Code Online (Sandbox Code Playgroud)
然后 Kafka Connect 将使用名称填充两个主题:
myConnector.myDatabase.users
myConnector.myDatabase.orders
现在,如果您仍然想更改目标主题的名称,可以使用 Kafka Connect 单消息转换 (SMT)。更准确地说,ExtractTopic
应该对你有帮助。请注意,尽管此 SMT 可以帮助您从消息的键或值中提取主题名称,因此您需要在负载中包含所需的主题名称。
例如,以下 SMT 将提取字段的值myField
并将其用作记录的主题:
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1780 次 |
最近记录: |