identityMapCapacityConfluent Schema Registry中的含义是什么CachedSchemaRegistryClient。根据文档,其声明如下:
public CachedSchemaRegistryClient(@NotNull String baseUrl,int identityMapCapacity)
Run Code Online (Sandbox Code Playgroud)
我看到几个帖子,它用int10初始化,在某个地方它是 1000。所以我不确定它到底是什么意思,我应该使用什么。
java avro apache-kafka confluent-schema-registry confluent-platform
我想知道 HDF 套件中嵌入的 kafka 和 Confluent 套件的区别,特别是模式注册工具。
apache-kafka hortonworks-data-platform confluent-schema-registry confluent-platform
我尝试使用模式注册使用 confluent-kafka-python 的 AvroProducer 发布一条 avro 消息。但是代码无法序列化枚举类型。下面是代码和错误跟踪。任何帮助深表感谢。
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from example_schema.schema_classes import SCHEMA as value_schema
from example_schema.com.acme import *
import json
def function():
avroProducer = AvroProducer({ 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' }, default_value_schema=value_schema)
print(avroProducer)
obj = Test()
obj.name = 'vinay'
obj.age = 11
obj.sex = 'm'
obj.myenum = Suit.CLUBS
print(str(obj))
avroProducer.produce(topic='test_topic',value=obj)
avroProducer.flush()
function()
File "main.py", line 16, in function
avroProducer.produce(topic='test_topic',value=json.dumps(obj))
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 105, …Run Code Online (Sandbox Code Playgroud) python avro apache-kafka confluent-schema-registry confluent-platform
Kafka schema-registry 提供了一种使用通用数据契约对来自 Kafka 的数据进行序列化和反序列化的好方法。然而,数据契约(.avsc 文件)是生产者和消费者之间的粘合剂。
一旦制作人制作了 .avsc 文件,就可以将其签入制作人一侧的版本控制。根据语言,它也会自动生成类。
然而,
avro apache-kafka confluent-schema-registry confluent-platform
My Kafka 和 Schema Registry 基于 Confluent 社区平台 5.2.2,My Spark 有 2.4.4 版本。我开始使用 Spark REPL env:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4
Run Code Online (Sandbox Code Playgroud)
并为 spark 会话设置 Kafka 源:
val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
Run Code Online (Sandbox Code Playgroud)
我得到了关于键和值的模式信息:
import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")
Run Code Online (Sandbox Code Playgroud)
首先,如果我用 writeStream 查询“ key ”,即
import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame …Run Code Online (Sandbox Code Playgroud) avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming
嗨,我正在使用 Confluence kafka。我有返回通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如
object options = ((GenericRecord)response.Message.Value["Product"])["Options"];
Run Code Online (Sandbox Code Playgroud)
我在这里找到了一个
使用 C# 反序列化 Avro 文件 但是如何将架构转换为流?我想知道我们是否可以使用任何解决方案反序列化到我们的 c# 模型中?任何帮助将不胜感激。谢谢。
c# avro apache-kafka confluent-schema-registry confluent-platform
在 C# 中,我可以定义这两个 POCO 来定义代
public class Family
{
public List<Person> FamilyMembers {get; set;}
}
public class Person
{
public string FirstName {get; set;}
public string LastName {get; set;}
public List<Person> Children {get; set;}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试定义一个 AVRO 架构来序列化 FamilyMembers。是否可以在 Avro 中定义一个递归数组(不确定这是否是正确的术语),而不必像下面这样在模式中指定每一代。
{
"type": "record",
"name": "family",
"namespace": "com.family.my",
"fields": [
{
"name":"familymember",
"type":{
"type": "array",
"items":{
"name":"person",
"type":"record",
"fields":[
{"name":"firstname", "type":"string"},
{"name":"lastname", "type":"string"},
{"name":"children",
"type":{
"type": "array",
"items":{
"name":"children",
"type":"record",
"fields":[
{"name":"firstname", "type":"string"},
{"name":"lastname", "type":"string"},
{"name":"grandchildren",
"type":{
"type": "array",
"items":{ …Run Code Online (Sandbox Code Playgroud) 在架构注册表中添加条目的“简单”语法如下:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/test-value/versions
Run Code Online (Sandbox Code Playgroud)
然而,当从终端执行此操作时,如果模式很大,这可能会非常繁重且不切实际。有没有一种方便的方法不使用内联模式(即模式内容)来涂抹curl命令,而只传递avro模式文件(avsc)?
我知道有一个python 工具可以做到这一点:
$ python register_schema.py http://localhost:8081 persons-avro person.avsc
Run Code Online (Sandbox Code Playgroud)
我也知道我可以通过 http 请求在 Java 中做到这一点(使用大量样板代码)
但我想知道是否有一种方法可以直接从命令行执行此操作(没有 python,在普通的 bash 中)
所以我正在学习 Kafka,并尝试在 docker compose 文件的帮助下在本地环境中设置它。我正在遵循以下示例:
https://docs.confluence.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html
按照这个例子,我已经完成了相当多的工作,直到进入步骤 8 的后半部分。
当尝试在 Kafka Connect 容器内部执行以下操作时kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10,我收到以下消息,但无法确定它尝试连接的内容:
[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-7022
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = …Run Code Online (Sandbox Code Playgroud) 我想运行集成测试来测试我的 kafka 监听器和 avro 序列化。这需要一个 Kafka 和一个 Schema 注册表(也可以称为 Zookeeper)。
测试时,我当前必须使用 docker-compose.yml,但我想通过 testcontainers 构建所需的容器来减少用户错误。Kafka 和 Zookeeper 实例启动得很好,看起来工作得很好——我的应用程序可以创建所需的主题,并且监听器也被订阅,我什至可以通过 kafka 控制台生产者发送消息。
不起作用的是 SchemaRegistry。容器启动,显然连接到了ZK,但无法建立到broker的连接。它会重试连接一段时间,直到超时,然后容器停止。因此,我无法在测试中注册和读取用于(反)序列化的 avro 模式,因此失败。
我找不到 SR 显然可以连接到 ZK 但找不到我的经纪人的原因。
有人也遇到过这个问题吗?你成功运行了吗?如果是这样,怎么会这样?我需要 Kafka 和架构注册表测试容器完全可用于我的测试,因此不能忽略其中任何一个。
我也可以继续使用 docker-compose.yml,但我真的很想完全以编程方式设置我的测试环境。
架构注册表容器记录以下内容:
2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO …Run Code Online (Sandbox Code Playgroud) java apache-kafka docker-compose testcontainers confluent-schema-registry