小编lin*_*fix的帖子

如何使用 MongoDB 在 Kafka Connect Sink 连接器中获取 kafka 消息的标头

如何使用 Kafka Connect 从 kafka 消息中检索传入标头,以使用 MongoDB Sink Connector 到 mongodb 将它们存储为附加数据字段。

我有一个卡夫卡主题“PROJECT_EXAMPLE_TOPIC”。如您所见,我已经能够保存消息时间戳、传入消息数据和 mongo 文档创建/更新日期。

我猜想有一个函数可以在某处提取标头。

卡夫卡值示例

  // incoming kafka value
  {
    "msgId" : "exampleId"
  }
Run Code Online (Sandbox Code Playgroud)
  1. 如何获取原始标头header_foo

  //expected example
  {
  
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" : "exampleId",
    "message_header_foo" : "header_foo_value"
   }


Run Code Online (Sandbox Code Playgroud)
  1. 如何获取所有kafka标头?
  //expected example
  {
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect mongodb-kafka-connector

6
推荐指数
1
解决办法
3485
查看次数