Hem*_*mar 7 python apache-kafka kafka-consumer-api kafka-python confluent-schema-registry
我们使用 schema 注册表来存储 schema,消息被序列化到 avro 并推送到 kafka 主题。
想知道,当从消费者读取数据时,如何找到 avro 记录被序列化的 schema id。我们需要此架构 ID 来跟踪是否将新列添加到表中的更改。如果添加或删除新列,架构注册表中将生成新的架构 id,以及如何在消费者中获取该 id。
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
auto_offset_reset = conf['AUTO_OFFSET'],
enable_auto_commit = conf['AUTO_COMMIT'],
auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
)
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
print(message.key)
Run Code Online (Sandbox Code Playgroud)
从上面的代码中,message.key 打印该特定记录的键,以及我们如何找到消费者用来反序列化记录的相应模式 ID。
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Run Code Online (Sandbox Code Playgroud)
我们想要从消费者那里获取 id 值"id":33
请就此提出建议。
您实际上可以做的是获取给定主题的主题的最新模式 id:
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
sr = CachedSchemaRegistryClient({
'url': 'http://localhost:8081',
'ssl.certificate.location': '/path/to/cert', # optional
'ssl.key.location': '/path/to/key' # optional
})
value_schema = sr.get_latest_schema("helpkit_internal.helpkit_support.agents-value")[1]
key_schema= sr.get_latest_schema("helpkit_internal.helpkit_support.agents-key")[1]
Run Code Online (Sandbox Code Playgroud)
通过主题名称获取模式
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='shelpkit_internal.helpkit_support.agents-value', version='latest')
Run Code Online (Sandbox Code Playgroud)