标签: mongodb-kafka-connector

我们可以在 mongodb 中更新/更新记录吗?数据源是kafka

我们可以在 mongodb 中更新/更新记录,但是如果有任何方法或函数可以直接在 mongodb 中更新或更新文档,并且源系统是 kafka,目标是 mongodb。

apache-kafka apache-kafka-connect mongodb-kafka-connector

8
推荐指数
1
解决办法
1400
查看次数

安装 kafka 连接器时出现 ConfluenceHubClient 异常

我已经设置了 Kafka 集群并安装confluent-hubEC2实例上。我已经下载了confluent-hubtar 文件,解压并将二进制文件放入/usr/local/bin.

当我现在尝试安装任何连接器时,我收到 ClassNotFoundException。

[ec2-user@ip-172-31-88-110 bin]$ confluent-hub install confluentinc/kafka-connect-jdb c:latest --component-dir /opt/connectors --worker-configs /etc/kafka/connect.properties

/usr/local/bin/confluent-hub: line 13: cd: /usr/local/bin/../share/java: No such file or directory
Error: Could not find or load main class io.confluent.connect.hub.cli.ConfluentHubClient
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.hub.cli.ConfluentHubClient
Run Code Online (Sandbox Code Playgroud)

我哪里出错了?

apache-kafka-connect mongodb-kafka-connector confluent-platform

8
推荐指数
2
解决办法
2966
查看次数

如何使用 MongoDB 在 Kafka Connect Sink 连接器中获取 kafka 消息的标头

如何使用 Kafka Connect 从 kafka 消息中检索传入标头,以使用 MongoDB Sink Connector 到 mongodb 将它们存储为附加数据字段。

我有一个卡夫卡主题“PROJECT_EXAMPLE_TOPIC”。如您所见,我已经能够保存消息时间戳、传入消息数据和 mongo 文档创建/更新日期。

我猜想有一个函数可以在某处提取标头。

卡夫卡值示例

  // incoming kafka value
  {
    "msgId" : "exampleId"
  }
Run Code Online (Sandbox Code Playgroud)
  1. 如何获取原始标头header_foo

  //expected example
  {
  
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" : "exampleId",
    "message_header_foo" : "header_foo_value"
   }


Run Code Online (Sandbox Code Playgroud)
  1. 如何获取所有kafka标头?
  //expected example
  {
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect mongodb-kafka-connector

6
推荐指数
1
解决办法
3485
查看次数

在 Kafka-Connect 中自动重新连接失败的任务

我正在使用带有 Kafka-connect 的 mongo-source 插件。我检查了任务状态,它正在运行并监听 mongo 集合。

我手动停止了 mongod 服务并等待了大约 1 分钟,然后我再次启动它。

我检查了源任务,看看是否有任何东西会自行修复,30 分钟后似乎没有任何效果。

只有在重新启动连接器后,它才再次开始工作。

由于 mongo-source 没有设置超时时重试+退避的选项,我搜索了一个适合简单场景的配置:使用 Kafka-connect 配置在 X 时间后重启失败的任务。找不到任何.. :/ 我可以用一个简单的脚本来做到这一点,但是 Kafka-connect 中必须有一些东西来管理失败的任务。甚至在 mongo-source 中……我不希望它在 1 分钟后就失败得这么快……:/

apache-kafka apache-kafka-connect mongodb-kafka-connector

5
推荐指数
1
解决办法
2015
查看次数

如何更改Kafka Connect Source Connector生成的主题名称

我有一个已经在运行的生产部署了 Kafka-Cluster 并且有主题“现有主题”。我正在使用 Debezium 的 MongoDB-Source-Connector。

在这里,我想要的只是将 CDC 事件直接推送到主题“现有主题”,以便我的已经在收听该主题的消费者能够处理它。

我没有找到任何资源可以这样做,但是有人提到该主题是以以下格式创建的 -

“如果你的mongodb.name参数是A,数据库名称是B,集合名称是C,那么数据库A和集合C的数据将被加载到主题ABC下”

我可以将主题更改为“现有主题”并将事件推送给它吗?

apache-kafka apache-kafka-connect mongodb-kafka-connector

5
推荐指数
1
解决办法
1780
查看次数

无法使用 Apache Kafka 使用 MongoDb 插件启动 Kafka Connect

我是 Kafka 的新手,我想看看是否可以使用 Kafka 将 MongoDb 数据与另一个系统同步。

我的设置:

  1. 我正在运行 AWS MSK 集群,并且手动创建了一个带有 Kafka 客户端的 EC2 实例。
  2. 我已将 MongoDB Kafka Connect 插件添加到/usr/local/share/kafka/plugins.
  3. 我正在运行 Kafka connect,可以看到它加载了插件
./bin/connect-standalone.sh ./config/connect-standalone.properties /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties
Run Code Online (Sandbox Code Playgroud)
[2020-10-17 13:57:22,304] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
Run Code Online (Sandbox Code Playgroud)
  1. 解压后的插件有这个结构
Archive:  mongodb-kafka-connect-mongodb-1.3.0.zip
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSinkConnector.properties  
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/README.md  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/LICENSE.txt  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/manifest.json  
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar  
   creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-leaf.png  
  inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-logo.png  
Run Code Online (Sandbox Code Playgroud)

这个插件来自 …

mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector

5
推荐指数
1
解决办法
652
查看次数