小编Hem*_*mar的帖子

从 kafka 消费者读取数据时,如何从用于 avro 记录的模式注册表中查找模式 id

我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。

想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。

consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])

for message in consumer:
    print(message.key)
Run Code Online (Sandbox Code Playgroud)

从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。

curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2

{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)

我们想要从消费者那里获取 id 值"id":33

请就此提出建议。

python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry

7
推荐指数
1
解决办法
5155
查看次数