我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。
想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
auto_offset_reset = conf['AUTO_OFFSET'],
enable_auto_commit = conf['AUTO_COMMIT'],
auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
)
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
print(message.key)
Run Code Online (Sandbox Code Playgroud)
从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)
我们想要从消费者那里获取 id 值"id":33
请就此提出建议。
python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry