Debezium,Kafka 连接:有没有办法只发送有效负载而不发送模式?

Thr*_*ots 4 apache-kafka apache-kafka-connect debezium

我在 kafka connect 中有一个发件箱 postgresql 表和 debezium 连接器,它根据表中添加的行创建 kafka 消息。

我面临的问题是消息格式。这是创建的消息值:

{
  "schema": {
    "type": "string",
    "optional": true,
    "name": "io.debezium.data.Json",
    "version": 1
  },
  "payload": "{\"foo\": \"bar\"}"
}
Run Code Online (Sandbox Code Playgroud)

但是(因为消费者)我需要消息仅包含有效负载,如下所示:

{
  "\"foo\": \"bar\""
}
Run Code Online (Sandbox Code Playgroud)

这是我的卡夫卡连接器配置的一部分:

"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.table.field.event.payload.id": "aggregate_id",
"transforms.outbox.table.fields.additional.placement": "payload_type:header:__TypeId__"
Run Code Online (Sandbox Code Playgroud)

有没有办法在不创建自定义变压器的情况下实现这一目标?

Rob*_*att 6

看起来您正在使用org.apache.kafka.connect.json.JsonConverterwithschemas.enable=true作为您的值转换器。当您执行此操作时,它将架构与消息中的有效负载一起嵌入。

\n

如果您设置了,value.converter.schemas.enable=false您应该只获取消息中的有效负载。

\n

参考:Kafka Connect:转换器和序列化解释 \xe2\x80\x94 JSON 和模式

\n