从 kafka 消费者读取数据时,如何从用于 avro 记录的模式注册表中查找模式 id

Hem*_*mar 7 python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry

我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。

想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。

consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])

for message in consumer:
    print(message.key)
Run Code Online (Sandbox Code Playgroud)

从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。

curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2

{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)

我们想要从消费者那里获取 id 值"id":33

请就此提出建议。

Gio*_*ous 1

您实际上可以做的是获取给定主题的主题的最新模式 id:

使用confluent-kafka-python

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

sr = CachedSchemaRegistryClient({
    'url': 'http://localhost:8081',
    'ssl.certificate.location': '/path/to/cert',  # optional
    'ssl.key.location': '/path/to/key'  # optional
})

value_schema = sr.get_latest_schema("helpkit_internal.helpkit_support.agents-value")[1]
key_schema= sr.get_latest_schema("helpkit_internal.helpkit_support.agents-key")[1]
Run Code Online (Sandbox Code Playgroud)

使用SchemaRegistryClient

通过主题名称获取模式

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='shelpkit_internal.helpkit_support.agents-value', version='latest')
Run Code Online (Sandbox Code Playgroud)