标签: confluent-schema-registry

模式注册中的 identityMapCapacity 是什么意思

identityMapCapacityConfluent Schema Registry中的含义是什么CachedSchemaRegistryClient。根据文档,其声明如下:

public CachedSchemaRegistryClient(@NotNull String baseUrl,int identityMapCapacity)
Run Code Online (Sandbox Code Playgroud)

我看到几个帖子,它用int10初始化,在某个地方它是 1000。所以我不确定它到底是什么意思,我应该使用什么。

java avro apache-kafka confluent-schema-registry confluent-platform

2
推荐指数
1
解决办法
1319
查看次数

2
推荐指数
1
解决办法
959
查看次数

类型错误:“mappingproxy”类型的对象不是 JSON 可序列化的

我尝试使用模式注册使用 confluent-kafka-python 的 AvroProducer 发布一条 avro 消息。但是代码无法序列化枚举类型。下面是代码和错误跟踪。任何帮助深表感谢。

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from example_schema.schema_classes import SCHEMA as value_schema
from example_schema.com.acme import *
import json

def function():
    avroProducer = AvroProducer({ 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' },  default_value_schema=value_schema)
    print(avroProducer)
    obj = Test()
    obj.name = 'vinay'
    obj.age = 11
    obj.sex = 'm'
    obj.myenum = Suit.CLUBS
    print(str(obj))
    avroProducer.produce(topic='test_topic',value=obj)
    avroProducer.flush()

function()

  File "main.py", line 16, in function
    avroProducer.produce(topic='test_topic',value=json.dumps(obj))
  File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
    value = self._serializer.encode_record_with_schema(topic, value_schema, value)
  File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 105, …
Run Code Online (Sandbox Code Playgroud)

python avro apache-kafka confluent-schema-registry confluent-platform

2
推荐指数
1
解决办法
5278
查看次数

如何跨团队共享 avro 模式定义

Kafka schema-registry 提供了一种使用通用数据契约对来自 Kafka 的数据进行序列化和反序列化的好方法。然而,数据契约(.avsc 文件)是生产者和消费者之间的粘合剂。

一旦制作人制作了 .avsc 文件,就可以将其签入制作人一侧的版本控制。根据语言,它也会自动生成类。

然而,

  1. 消费者下拉模式定义以供参考的最佳机制是什么?有没有像 swaggerhub 或 avro 的典型 api 文档门户之类的东西?
  2. 如果我们使用 Confluent 平台,控制中心提供了一个 gui 来查看与主题关联的模式,但它也允许用户进行编辑。生产者和消费者团队之间将如何工作?什么会阻止消费者或任何人直接在 Confluent 平台上编辑模式?
  3. 这是我们需要使用rest-proxy自定义构建的东西吗?

avro apache-kafka confluent-schema-registry confluent-platform

2
推荐指数
1
解决办法
652
查看次数

如何将 Confluent Schema Registry 与 from_avro 标准函数一起使用?

My Kafka 和 Schema Registry 基于 Confluent 社区平台 5.2.2,My Spark 有 2.4.4 版本。我开始使用 Spark REPL env:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4
Run Code Online (Sandbox Code Playgroud)

并为 spark 会话设置 Kafka 源:

val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name" 
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
Run Code Online (Sandbox Code Playgroud)

我得到了关于键和值的模式信息:

import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")
Run Code Online (Sandbox Code Playgroud)

首先,如果我用 writeStream 查询“ key ”,即

import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame …
Run Code Online (Sandbox Code Playgroud)

avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming

2
推荐指数
1
解决办法
2294
查看次数

如何使用 C# 反序列化 Kafka 中的 Avro 消息

嗨,我正在使用 Confluence kafka。我有返回通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如

 object options = ((GenericRecord)response.Message.Value["Product"])["Options"];
Run Code Online (Sandbox Code Playgroud)

我在这里找到了一个

使用 C# 反序列化 Avro 文件 但是如何将架构转换为流?我想知道我们是否可以使用任何解决方案反序列化到我们的 c# 模型中?任何帮助将不胜感激。谢谢。

c# avro apache-kafka confluent-schema-registry confluent-platform

1
推荐指数
1
解决办法
9914
查看次数

Avro 架构和数组

在 C# 中,我可以定义这两个 POCO 来定义代

public class Family
{
public List<Person> FamilyMembers {get; set;}
}

public class Person
{
public string FirstName {get; set;}
public string LastName {get; set;}
public List<Person> Children {get; set;}
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试定义一个 AVRO 架构来序列化 FamilyMembers。是否可以在 Avro 中定义一个递归数组(不确定这是否是正确的术语),而不必像下面这样在模式中指定每一代。

{
  "type": "record",
  "name": "family",
  "namespace": "com.family.my",
  "fields": [

    {
     "name":"familymember",
            "type":{
                "type": "array",  
                "items":{
                    "name":"person",
                    "type":"record",
                    "fields":[
                                {"name":"firstname", "type":"string"},
                                {"name":"lastname",  "type":"string"},
                                {"name":"children",
                                            "type":{
                                            "type": "array",  
                                            "items":{
                                                    "name":"children",
                                                    "type":"record",
                                                    "fields":[
                                                                {"name":"firstname", "type":"string"},
                                                                {"name":"lastname",  "type":"string"},
                                                                 {"name":"grandchildren",
                                                                                "type":{
                                                                                "type": "array",  
                                                                                "items":{ …
Run Code Online (Sandbox Code Playgroud)

avro confluent-schema-registry

1
推荐指数
1
解决办法
5420
查看次数

使用文件 (Curl) 在 Kafka Schema 注册表中创建新条目

在架构注册表中添加条目的“简单”语法如下:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/test-value/versions
Run Code Online (Sandbox Code Playgroud)

然而,当从终端执行此操作时,如果模式很大,这可能会非常繁重且不切实际。有没有一种方便的方法不使用内联模式(即模式内容)来涂抹curl命令,而只传递avro模式文件(avsc)?

我知道有一个python 工具可以做到这一点:

$ python register_schema.py http://localhost:8081 persons-avro person.avsc
Run Code Online (Sandbox Code Playgroud)

我也知道我可以通过 http 请求在 Java 中做到这一点(使用大量样板代码)

但我想知道是否有一种方法可以直接从命令行执行此操作(没有 python,在普通的 bash 中)

apache-kafka confluent-schema-registry

1
推荐指数
1
解决办法
2948
查看次数

Docker Kafka Avro 控制台消费者 - 连接被拒绝

所以我正在学习 Kafka,并尝试在 docker compose 文件的帮助下在本地环境中设置它。我正在遵循以下示例:

https://docs.confluence.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

按照这个例子,我已经完成了相当多的工作,直到进入步骤 8 的后半部分。

当尝试在 Kafka Connect 容器内部执行以下操作时kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10,我收到以下消息,但无法确定它尝试连接的内容:

[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-7022
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka docker-compose confluent-schema-registry

1
推荐指数
1
解决办法
2353
查看次数

Testcontainers SchemaRegistry 无法连接到 Kafka 容器

我想运行集成测试来测试我的 kafka 监听器和 avro 序列化。这需要一个 Kafka 和一个 Schema 注册表(也可以称为 Zookeeper)。

测试时,我当前必须使用 docker-compose.yml,但我想通过 testcontainers 构建所需的容器来减少用户错误。Kafka 和 Zookeeper 实例启动得很好,看起来工作得很好——我的应用程序可以创建所需的主题,并且监听器也被订阅,我什至可以通过 kafka 控制台生产者发送消息。

不起作用的是 SchemaRegistry。容器启动,显然连接到了ZK,但无法建立到broker的连接。它会重试连接一段时间,直到超时,然后容器停止。因此,我无法在测试中注册和读取用于(反)序列化的 avro 模式,因此失败。

我找不到 SR 显然可以连接到 ZK 但找不到我的经纪人的原因。

有人也遇到过这个问题吗?你成功运行了吗?如果是这样,怎么会这样?我需要 Kafka 和架构注册表测试容器完全可用于我的测试,因此不能忽略其中任何一个。

我也可以继续使用 docker-compose.yml,但我真的很想完全以编程方式设置我的测试环境。

架构注册表容器记录以下内容:

2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka docker-compose testcontainers confluent-schema-registry

1
推荐指数
1
解决办法
3894
查看次数