Kafka Rest 代理 JSON 模式验证

llo*_*ono 5 jsonschema apache-kafka confluent-schema-registry confluent-rest-proxy

我想通过 Kafka Rest Proxy 生成一个 kafka 主题。我已经在架构注册表中创建了一个 JSON 架构,我希望所有消息都根据已注册的架构进行验证,如果它们与架构不匹配,则将其拒绝。

我的架构

{
  "type": "object",
  "properties": {
    "foo": {
      "type": "string",
    },
    "bar": {
      "type": "number" 
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

该架构已正确注册并分配版本 1。然后我尝试为两者生成数据类型错误的消息foobar但该消息被接受。

curl --location --request POST 'http://localhost:8082/topics/test' \
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \
--header 'Accept: application/vnd.kafka.v2+json' \
--data-raw '{
    "value_schema_id": 1,
    "records": [
        {
            "value": {
                "foo": 10,
                "bar":"not a number"
            }
        }
    ]
}'
Run Code Online (Sandbox Code Playgroud)

请注意,我正在生成test具有关联模式的主题,但无论如何都会接受错误消息。我还尝试添加"value_schema_id": 1以确保有效负载中引用了架构,但错误消息仍然被接受。

但是,如果我传递 JSON 架构,因为value_schema它按预期工作

{
    "value_schema": "{\"type\": \"object\",\"properties\": {\"foo\": {\"type\": \"string\"},\"bar\": {\"type\": \"number\"}}}",
    "records": [
        {
            "value": {
                "foo": "10",
                "bar": "1"
            }
        }
    ]
}
Run Code Online (Sandbox Code Playgroud)

回复

{
    "error_code": 42203,
    "message": "Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/bar: expected type: Number, found: String"
}
Run Code Online (Sandbox Code Playgroud)

问题:是否可以在生成消息时引用现有模式 id,而不必每次都传递整个 JSON 模式?

小智 2

是的,这是可能的。您必须将其启用为值、键或两者,如下所示:

confluent.value.schema.validation=true 
confluent.key.schema.validation=true
Run Code Online (Sandbox Code Playgroud)

此外,您可以在 JSON 请求中指定架构的 ID。以下面为例,来说明我的意思:

{
  "key_schema_id": 1234,
  "value_schema_id: 56789,
  "records": [
    {
      "key": "key123",
      "value": "my_simple_string_value_example"
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

在哪里:

key_schema_id

是架构注册表中关键架构的 ID。通过为其设置适当的值,将保证只有具有符合该 ID 标识的模式的密钥的消息才会被接受。

相似地:

值模式 ID

是架构注册表中值架构的 ID。通过为其设置适当的值,将保证仅接受其值符合该 ID 标识的模式的消息。

希望这有助于阐明一些道理。

干杯,

爱德华多·庞佐尼

  • 好一个。我正在使用 Strimzi - Kubernetes 上的 Apache Kafka 以及开源 Rest 代理和 Schema 注册表 atm。我尝试在主题级别设置这些属性,但失败并显示未知主题配置。更多的互联网搜索告诉我这些属性仅对 Confluence 有效。是否有任何解决方法可以在写入时/当其余代理写入 Kafka 时完成模式验证? (2认同)