我在这里找到了一些代码https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-dotnet-avro-serialization#Scenario2与我需要的相反:
//Define the schema in JSON
const string Schema = @"{
""type"":""record"",
""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
""fields"":
[
{
""name"":""Location"",
""type"":
{
""type"":""record"",
""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
""fields"":
[
{ ""name"":""Floor"", ""type"":""int"" },
{ ""name"":""Room"", ""type"":""int"" }
]
}
},
{ ""name"":""Value"", ""type"":""bytes"" }
]
}";
//Create a generic serializer based on the schema
var serializer = AvroSerializer.CreateGeneric(Schema);
Run Code Online (Sandbox Code Playgroud)
我想采用我创建的模型:
[DataContract(Name = "Demo", Namespace = "pubsub.demo")]
public class Demo
{
[DataMember(Name = "value")]
public long Value { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
...并将此 C# …
我是 Confluence/Kafka 的新手,我想从 kafka 中查找元数据信息
我想知道
Confluence版本是5.0
可以提供此信息的类(方法)是什么?
是否有任何 Rest API 用于相同的情况?
还需要连接 Zookeeper 才能获取此信息。
apache-kafka kafka-producer-api confluent-schema-registry confluent-platform
我们有一个 kafka 集群,运行着存储在 Confluence 的 Schema-registry 中的 Avro schema。在最近重新部署(其中一个)我们的流应用程序时,我们开始在单个主题(EmailSent)上看到不兼容的架构错误。这是唯一失败的主题,每当向该主题提交新的 EmailSent 事件时,我们都会收到错误。
Caused by:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"EmailSent","namespace":"com.company_name.communications.schemas","fields":[{"name":"customerId","type":"long","doc":"Customer's ID in the customers service"},{"name":"messageId","type":"long","doc":"The message id of the sent email"},{"name":"sentTime","type":{"type":"string","avro.java.string":"String"},"doc":"The campaign sent time in format 'yyyy-MM-dd HH:mm:ss.SSS'"},{"name":"campaignId","type":"long","doc":"The id of the campaign in the marketing suite"},{"name":"appId","type":["null","long"],"doc":"The app id associated with the sent email, if the email was related to a specific application","default":null}],"version":1}
Caused by:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at …Run Code Online (Sandbox Code Playgroud) avro apache-kafka apache-kafka-streams confluent-schema-registry
我已经在 Ubuntu 16.04 机器上安装了合流平台,最初我已经配置了 Zookeeper、Kafka 和 ksql 并启动了合流平台。我能够看到下面的消息。
root@DESKTOP-DIB3097:/opt/kafkafull/confluent-5.1.0/bin# ./confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.HUlCltYT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Run Code Online (Sandbox Code Playgroud)
现在一切都准备好了,当我检查汇合平台的状态时,我发现架构注册表、连接和控制中心都已关闭。
我检查了模式注册表的日志并发现了以下日志。
ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210) …Run Code Online (Sandbox Code Playgroud) 我们使用 Confluence SchemaRegistry 和 KafkaStreams 已经一年多了,一切都运行良好;直到昨天。
在 UAT 环境中,我们似乎删除了一个架构主题,并且我们的一个应用程序开始故障转移并显示以下消息
[错误] LogAndFailExceptionHandler - 反序列化期间捕获异常,taskId:0_13,主题:TOPIC_NAME,分区:13,偏移量:0 org.apache.kafka.common.errors.SerializationException:检索 id 1531 的 Avro 架构时出错
我检查了架构注册表,发现主题丢失了,并使用curl查询错误中列出的id 1531,例如:
curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
Run Code Online (Sandbox Code Playgroud)
并回来了:
{"error_code":40403,"message":"Schema not found"}
Run Code Online (Sandbox Code Playgroud)
我天真地只是尝试再次注册架构,没有考虑它,它起作用了,但是注册架构的 id 与之前的 1531 ID 不同。
我需要将架构注册到 ID 1531,因为主题中的现有消息已在魔术字节中包含该 Id 1531。
我在https://docs.confluence.io/current/schema-registry/docs/develop/api.html检查了 API 文档,但没有看到任何用于为模式设置给定 Id 的内容。
无论如何,是否可以使用架构注册表将架构强制指定为特定 ID?
我知道一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失或采取特殊措施来修复主题数据。
我们正在使用镜像生成器来同步本地和 AWS Kafka 主题。如何在其他集群(本例中为 AWS)中以完全相同的方式复制在本地注册的架构的主题?如何使用镜像制作器复制 Avro 架构?
apache-kafka confluent-schema-registry apache-kafka-mirrormaker
我的源连接器抛出
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003
或者
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Master not known
我发现当模式注册表的主更改并且我在 k8s 上的同一服务下有两个模式注册表副本时会发生这种情况。
最上面的异常是org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
如何增加容忍度,以便连接器可以重试更多次,直到选出新的主节点?
在构建gradle文件中添加汇合kafka的依赖项时,无法解决它。
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: '4.0.0'
compile group: 'io.confluent', name: 'kafka-schema-registry', version: '4.0.0'
compile 'io.confluent:kafka-schema-registry:4.0.0:tests'
Run Code Online (Sandbox Code Playgroud)
添加后出现以下错误。
java apache-kafka confluent-schema-registry confluent-platform
我想通过 Kafka Rest Proxy 生成一个 kafka 主题。我已经在架构注册表中创建了一个 JSON 架构,我希望所有消息都根据已注册的架构进行验证,如果它们与架构不匹配,则将其拒绝。
我的架构
{
"type": "object",
"properties": {
"foo": {
"type": "string",
},
"bar": {
"type": "number"
}
}
}
Run Code Online (Sandbox Code Playgroud)
该架构已正确注册并分配版本 1。然后我尝试为两者生成数据类型错误的消息foo,bar但该消息被接受。
curl --location --request POST 'http://localhost:8082/topics/test' \
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \
--header 'Accept: application/vnd.kafka.v2+json' \
--data-raw '{
"value_schema_id": 1,
"records": [
{
"value": {
"foo": 10,
"bar":"not a number"
}
}
]
}'
Run Code Online (Sandbox Code Playgroud)
请注意,我正在生成test具有关联模式的主题,但无论如何都会接受错误消息。我还尝试添加"value_schema_id": 1以确保有效负载中引用了架构,但错误消息仍然被接受。
但是,如果我传递 JSON 架构,因为value_schema它按预期工作
{
"value_schema": "{\"type\": \"object\",\"properties\": …Run Code Online (Sandbox Code Playgroud) jsonschema apache-kafka confluent-schema-registry confluent-rest-proxy
我读到Docker (Compose) clientconnectstokakatooearly,但它没有给出要检查的命令。
我应该如何配置我的 kafka 代理,以便在 Zookeeper 未准备好时重试?由于 kafka 代理尚未准备好,我的模式注册表也失败。
我的 docker-compose 文件:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.5.3
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper …Run Code Online (Sandbox Code Playgroud)