标签: confluent-schema-registry

C# 模型可以序列化为 AVRO JSON 模式吗?

我在这里找到了一些代码https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-dotnet-avro-serialization#Scenario2与我需要的相反

//Define the schema in JSON
const string Schema = @"{
    ""type"":""record"",
    ""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
    ""fields"":
        [
            {
                ""name"":""Location"",
                ""type"":
                    {
                        ""type"":""record"",
                        ""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
                        ""fields"":
                            [
                                { ""name"":""Floor"", ""type"":""int"" },
                                { ""name"":""Room"", ""type"":""int"" }
                            ]
                    }
            },
            { ""name"":""Value"", ""type"":""bytes"" }
        ]
}";

//Create a generic serializer based on the schema
var serializer = AvroSerializer.CreateGeneric(Schema);
Run Code Online (Sandbox Code Playgroud)

我想采用我创建的模型:

[DataContract(Name = "Demo", Namespace = "pubsub.demo")]
public class Demo
{
    [DataMember(Name = "value")]
    public long Value { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

...并将此 C# …

c# avro confluent-schema-registry

6
推荐指数
1
解决办法
6494
查看次数

来自kafka的元数据信息

我是 Confluence/Kafka 的新手,我想从 kafka 中查找元数据信息

我想知道

  1. 生产者名单
  2. 主题列表
  3. 主题的架构信息

Confluence版本是5.0

可以提供此信息的类(方法)是什么?
是否有任何 Rest API 用于相同的情况?
还需要连接 Zookeeper 才能获取此信息。

apache-kafka kafka-producer-api confluent-schema-registry confluent-platform

5
推荐指数
1
解决办法
7268
查看次数

Schema-Registry 拒绝未更改的模式,因为它不兼容

我们有一个 kafka 集群,运行着存储在 Confluence 的 Schema-registry 中的 Avro schema。在最近重新部署(其中一个)我们的流应用程序时,我们开始在单个主题(EmailSent)上看到不兼容的架构错误。这是唯一失败的主题,每当向该主题提交新的 EmailSent 事件时,我们都会收到错误。

Caused by:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"EmailSent","namespace":"com.company_name.communications.schemas","fields":[{"name":"customerId","type":"long","doc":"Customer's ID in the customers service"},{"name":"messageId","type":"long","doc":"The message id of the sent email"},{"name":"sentTime","type":{"type":"string","avro.java.string":"String"},"doc":"The campaign sent time in format 'yyyy-MM-dd HH:mm:ss.SSS'"},{"name":"campaignId","type":"long","doc":"The id of the campaign in the marketing suite"},{"name":"appId","type":["null","long"],"doc":"The app id associated with the sent email, if the email was related to a specific application","default":null}],"version":1}
Caused by:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at …
Run Code Online (Sandbox Code Playgroud)

avro apache-kafka apache-kafka-streams confluent-schema-registry

5
推荐指数
1
解决办法
6761
查看次数

无法启动 Confluence 架构注册表

我已经在 Ubuntu 16.04 机器上安装了合流平台,最初我已经配置了 Zookeeper、Kafka 和 ksql 并启动了合流平台。我能够看到下面的消息。

 root@DESKTOP-DIB3097:/opt/kafkafull/confluent-5.1.0/bin# ./confluent start
 This CLI is intended for development only, not for production
 https://docs.confluent.io/current/cli/index.html
 Using CONFLUENT_CURRENT: /tmp/confluent.HUlCltYT
 Starting zookeeper
 zookeeper is [UP]
 Starting kafka
 kafka is [UP]
 Starting schema-registry
 schema-registry is [UP]
 Starting kafka-rest
 kafka-rest is [UP]
 Starting connect
 connect is [UP]
 Starting ksql-server
 ksql-server is [UP]
 Starting control-center
 control-center is [UP]
Run Code Online (Sandbox Code Playgroud)

现在一切都准备好了,当我检查汇合平台的状态时,我发现架构注册表、连接和控制中心都已关闭。

我检查了模式注册表的日志并发现了以下日志。


ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka confluent-schema-registry confluent-platform

5
推荐指数
2
解决办法
1万
查看次数

使用特定 ID 将架构添加到架构注册表

我们使用 Confluence SchemaRegistry 和 KafkaStreams 已经一年多了,一切都运行良好;直到昨天。

在 UAT 环境中,我们似乎删除了一个架构主题,并且我们的一个应用程序开始故障转移并显示以下消息

[错误] LogAndFailExceptionHandler - 反序列化期间捕获异常,taskId:0_13,主题:TOPIC_NAME,分区:13,偏移量:0 org.apache.kafka.common.errors.SerializationException:检索 id 1531 的 Avro 架构时出错

我检查了架构注册表,发现主题丢失了,并使用curl查询错误中列出的id 1531,例如:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
Run Code Online (Sandbox Code Playgroud)

并回来了:

{"error_code":40403,"message":"Schema not found"}
Run Code Online (Sandbox Code Playgroud)

我天真地只是尝试再次注册架构,没有考虑它,它起作用了,但是注册架构的 id 与之前的 1531 ID 不同。

我需要将架构注册到 ID 1531,因为主题中的现有消息已在魔术字节中包含该 Id 1531。

我在https://docs.confluence.io/current/schema-registry/docs/develop/api.html检查了 API 文档,但没有看到任何用于为模式设置给定 Id 的内容。

无论如何,是否可以使用架构注册表将架构强制指定为特定 ID?

我知道一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失或采取特殊措施来修复主题数据。

java apache-kafka confluent-schema-registry

5
推荐指数
2
解决办法
2971
查看次数

如何使用Kafka镜像制作器复制模式?

我们正在使用镜像生成器来同步本地和 AWS Kafka 主题。如何在其他集群(本例中为 AWS)中以完全相同的方式复制在本地注册的架构的主题?如何使用镜像制作器复制 Avro 架构?

apache-kafka confluent-schema-registry apache-kafka-mirrormaker

5
推荐指数
1
解决办法
3819
查看次数

当模式注册表的主更改时连接器失败

我的源连接器抛出 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003

或者 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Master not known

我发现当模式注册表的主更改并且我在 k8s 上的同一服务下有两个模式注册表副本时会发生这种情况。

最上面的异常是org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

如何增加容忍度,以便连接器可以重试更多次,直到选出新的主节点?

apache-kafka apache-kafka-connect confluent-schema-registry

5
推荐指数
1
解决办法
1679
查看次数

无法解决 gradle 中的融合 kafka 依赖关系?

在构建gradle文件中添加汇合kafka的依赖项时,无法解决它。

   compile group: 'io.confluent', name: 'kafka-avro-serializer', version: '4.0.0'
   compile group: 'io.confluent', name: 'kafka-schema-registry', version: '4.0.0'
   compile 'io.confluent:kafka-schema-registry:4.0.0:tests'
Run Code Online (Sandbox Code Playgroud)

添加后出现以下错误。

在此输入图像描述

java apache-kafka confluent-schema-registry confluent-platform

5
推荐指数
2
解决办法
1万
查看次数

Kafka Rest 代理 JSON 模式验证

我想通过 Kafka Rest Proxy 生成一个 kafka 主题。我已经在架构注册表中创建了一个 JSON 架构,我希望所有消息都根据已注册的架构进行验证,如果它们与架构不匹配,则将其拒绝。

我的架构

{
  "type": "object",
  "properties": {
    "foo": {
      "type": "string",
    },
    "bar": {
      "type": "number" 
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

该架构已正确注册并分配版本 1。然后我尝试为两者生成数据类型错误的消息foobar但该消息被接受。

curl --location --request POST 'http://localhost:8082/topics/test' \
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \
--header 'Accept: application/vnd.kafka.v2+json' \
--data-raw '{
    "value_schema_id": 1,
    "records": [
        {
            "value": {
                "foo": 10,
                "bar":"not a number"
            }
        }
    ]
}'
Run Code Online (Sandbox Code Playgroud)

请注意,我正在生成test具有关联模式的主题,但无论如何都会接受错误消息。我还尝试添加"value_schema_id": 1以确保有效负载中引用了架构,但错误消息仍然被接受。

但是,如果我传递 JSON 架构,因为value_schema它按预期工作

{
    "value_schema": "{\"type\": \"object\",\"properties\": …
Run Code Online (Sandbox Code Playgroud)

jsonschema apache-kafka confluent-schema-registry confluent-rest-proxy

5
推荐指数
1
解决办法
1582
查看次数

docker-compose kafka等待zookeeper和schema-registry等待kafka

我读到Docker (Compose) clientconnectstokakatooearly,但它没有给出要检查的命令。

我应该如何配置我的 kafka 代理,以便在 Zookeeper 未准备好时重试?由于 kafka 代理尚未准备好,我的模式注册表也失败。

我的 docker-compose 文件:

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.5.3
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker confluent-schema-registry

5
推荐指数
1
解决办法
2733
查看次数