我们可以在 mongodb 中更新/更新记录,但是如果有任何方法或函数可以直接在 mongodb 中更新或更新文档,并且源系统是 kafka,目标是 mongodb。
我已经设置了 Kafka 集群并安装confluent-hub在EC2实例上。我已经下载了confluent-hubtar 文件,解压并将二进制文件放入/usr/local/bin.
当我现在尝试安装任何连接器时,我收到 ClassNotFoundException。
[ec2-user@ip-172-31-88-110 bin]$ confluent-hub install confluentinc/kafka-connect-jdb c:latest --component-dir /opt/connectors --worker-configs /etc/kafka/connect.properties
/usr/local/bin/confluent-hub: line 13: cd: /usr/local/bin/../share/java: No such file or directory
Error: Could not find or load main class io.confluent.connect.hub.cli.ConfluentHubClient
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.hub.cli.ConfluentHubClient
Run Code Online (Sandbox Code Playgroud)
我哪里出错了?
apache-kafka-connect mongodb-kafka-connector confluent-platform
如何使用 Kafka Connect 从 kafka 消息中检索传入标头,以使用 MongoDB Sink Connector 到 mongodb 将它们存储为附加数据字段。
我有一个卡夫卡主题“PROJECT_EXAMPLE_TOPIC”。如您所见,我已经能够保存消息时间戳、传入消息数据和 mongo 文档创建/更新日期。
我猜想有一个函数可以在某处提取标头。
卡夫卡值示例
// incoming kafka value
{
"msgId" : "exampleId"
}
Run Code Online (Sandbox Code Playgroud)
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_header_foo" : "header_foo_value"
}
Run Code Online (Sandbox Code Playgroud)
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" …Run Code Online (Sandbox Code Playgroud) 我正在使用带有 Kafka-connect 的 mongo-source 插件。我检查了源任务状态,它正在运行并监听 mongo 集合。
我手动停止了 mongod 服务并等待了大约 1 分钟,然后我再次启动它。
我检查了源任务,看看是否有任何东西会自行修复,30 分钟后似乎没有任何效果。
只有在重新启动连接器后,它才再次开始工作。
由于 mongo-source 没有设置超时时重试+退避的选项,我搜索了一个适合简单场景的配置:使用 Kafka-connect 配置在 X 时间后重启失败的任务。找不到任何.. :/ 我可以用一个简单的脚本来做到这一点,但是 Kafka-connect 中必须有一些东西来管理失败的任务。甚至在 mongo-source 中……我不希望它在 1 分钟后就失败得这么快……:/
我有一个已经在运行的生产部署了 Kafka-Cluster 并且有主题“现有主题”。我正在使用 Debezium 的 MongoDB-Source-Connector。
在这里,我想要的只是将 CDC 事件直接推送到主题“现有主题”,以便我的已经在收听该主题的消费者能够处理它。
我没有找到任何资源可以这样做,但是有人提到该主题是以以下格式创建的 -
“如果你的mongodb.name参数是A,数据库名称是B,集合名称是C,那么数据库A和集合C的数据将被加载到主题ABC下”
我可以将主题更改为“现有主题”并将事件推送给它吗?
我是 Kafka 的新手,我想看看是否可以使用 Kafka 将 MongoDb 数据与另一个系统同步。
我的设置:
/usr/local/share/kafka/plugins../bin/connect-standalone.sh ./config/connect-standalone.properties /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties
Run Code Online (Sandbox Code Playgroud)
[2020-10-17 13:57:22,304] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
Run Code Online (Sandbox Code Playgroud)
Archive: mongodb-kafka-connect-mongodb-1.3.0.zip
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSinkConnector.properties
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/README.md
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/LICENSE.txt
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/manifest.json
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-leaf.png
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-logo.png
Run Code Online (Sandbox Code Playgroud)
这个插件来自 …
mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector