如何使用 Kafka Connect 从 kafka 消息中检索传入标头,以使用 MongoDB Sink Connector 到 mongodb 将它们存储为附加数据字段。
我有一个卡夫卡主题“PROJECT_EXAMPLE_TOPIC”。如您所见,我已经能够保存消息时间戳、传入消息数据和 mongo 文档创建/更新日期。
我猜想有一个函数可以在某处提取标头。
卡夫卡值示例
// incoming kafka value
{
"msgId" : "exampleId"
}
Run Code Online (Sandbox Code Playgroud)
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_header_foo" : "header_foo_value"
}
Run Code Online (Sandbox Code Playgroud)
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" …Run Code Online (Sandbox Code Playgroud)