How to transform and extract fields in Kafka sink JDBC connector

Gio*_*ous 6 apache-kafka apache-kafka-connect

I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}
Run Code Online (Sandbox Code Playgroud)

What configuration is needed in the sink file in order to extract all the (sub)fields under data and headers and ignore those under beforeData so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:

USER_ID, USER_CATEGORY, operation, timestamp
Run Code Online (Sandbox Code Playgroud)

I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.

cri*_*007 5

我想你想要ExtractField,不幸的是,这是一个Map.get操作,所以这意味着 1) 嵌套字段不能一次性获得 2) 多个字段需要多次转换。

话虽如此,您可能会尝试此操作(未经测试)

transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers
Run Code Online (Sandbox Code Playgroud)

如果这不起作用,您最好实现自己的转换包,该包至少可以从结构/映射中删除值。


Mar*_*lee 5

如果您愿意列出特定的字段名称,可以通过以下方式解决:

  1. 使用 Flatten 变换折叠嵌套(这会将原始结构的路径转换为点分隔的名称)
  2. 使用替换转换rename使字段名称成为您希望接收器发出的名称
  3. 使用另一个替换变换whitelist将发射的字段限制为您选择的字段

对于您的情况,它可能如下所示:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
Run Code Online (Sandbox Code Playgroud)