我正在尝试使用 Postman 向注册表写入一个非常简单的模式,并且一直很难让它注册。只是注册一个简单的模式真的这么复杂吗?这只是整个过程的第一步,还是我在这里遗漏了一些东西?我正在使用的架构如下:
{
"schema":{
"type" : "record",
"name" : "User",
"namespace" : "com.temp.avro.model",
"fields" : [ {
"name" : "_id",
"type" : "string"
}, {
"name" : "updatedDate",
"type":"long",
"logicalType":"timestamp-millis"
}, {
"name" : "createdDate",
"type":"long",
"logicalType":"timestamp-millis"
}, {
"name" : "applicationId",
"type": ["null", "string"],
"default": null
},{
"name" : "country",
"type" : "string"
}, {
"name" : "bank",
"type" : "string"
}]
}
}
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Internal Server Error com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT …Run Code Online (Sandbox Code Playgroud) 在向 Kafka 生成 AVRO 数据时,Avro 序列化程序在写入数据时使用的字节数组中写入相同的架构 ID。
Kafka Consumer 根据收到的字节数组中的模式 ID 从模式注册表中获取模式。因此,生产者和消费者都使用相同的模式 ID,因此模式也如此。
但为什么包括这篇文章在内的许多文章都说消费者的模式可能与生产者的模式不同。
请帮助我理解这一点。
avro apache-kafka kafka-producer-api confluent-schema-registry
我将 Spring Cloud Stream 与Aiven的架构注册表一起使用,后者使用Confluence 的架构注册表。Aiven 的架构注册表受密码保护。根据这些说明,需要设置这两个配置参数才能成功访问架构注册表服务器。
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
Run Code Online (Sandbox Code Playgroud)
当我只使用vanilla java的kafka驱动程序时一切都很好,但是如果我使用Spring云流,我不知道如何注入这两个参数。目前,我正在将"basic.auth.user.info"和"basic.auth.credentials.source"放入文件"spring.cloud.stream.kafka.binder.configuration"中application.yml。
这样做,我就进入"401 Unauthorized"了模式想要注册的线路。
更新1:
根据 Ali n 的建议,我更新了 SchemaRegistryClient 的 bean 的配置方式,以便它能够识别 SSL 上下文。
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> …Run Code Online (Sandbox Code Playgroud) avro apache-kafka spring-cloud-stream confluent-schema-registry aiven
我正在使用这里的汇合 cp-all-in-one 项目配置:https://github.com/confluenceinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one /docker-compose.yml
http://localhost:8082/topics/zuum-positions
我正在使用以下 AVRO 正文发布一条消息:
{
"key_schema": "{\"type\":\"string\"}",
"value_schema":"{ \"type\":\"record\",\"name\":\"Position\",\"fields\":[ { \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{ \"name\":\"lon\",\"type\":\"double\"}]}",
"records":[
{
"key":"22",
"value":{
"lat":43.33,
"lon":43.33,
"loadId":22
}
}
]
}
Run Code Online (Sandbox Code Playgroud)
我已将以下标头正确添加到上述 POST 请求中:
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json
执行此请求时,我在 docker 日志中看到以下异常:
Error encountered in task zuum-sink-positions-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='zuum-positions', partition=0, offset=25, timestamp=1563480487456, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic zuum-positions to Avro:
connect | at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) …Run Code Online (Sandbox Code Playgroud) apache-kafka docker confluent-schema-registry kafka-rest confluent-platform
我正在使用 ECS 集群为我们的 MSK Kafka 集群构建托管在 Amazon 中的冗余架构注册表。
SchemaRegistry TaskDefinition 需要定义每个任务运行时唯一的主机名。
SchemaRegistryTaskDefinition:
Type: AWS::ECS::TaskDefinition
Properties:
Family: !Ref SchemaRegistryTaskName
RequiresCompatibilities: [ EC2 ]
NetworkMode: bridge
Cpu: !Ref CPUReservation
Memory: !Ref MemoryReservation
Volumes: []
ContainerDefinitions:
- Name: !Ref SchemaRegistryTaskName
Image: !Ref SchemaRegistryTaskImage
Essential: true
PortMappings:
- ContainerPort: !Ref SchemaRegistryPort
HostPort: 0 # Randomly assigned port from the ephemeral port range.
Environment:
- Name: AWS_DEFAULT_REGION
Value: !Ref AWS::Region
- Name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
Value: !Ref MskBrokerUrls
- Name: SCHEMA_REGISTRY_HOST_NAME
Value: $HOSTNAME
LogConfiguration:
LogDriver: awslogs
Options: …Run Code Online (Sandbox Code Playgroud) 我将 JSON 字符串作为输入,我使用此将 Json 字符串转换为 avro 模式
架构 schema = JsonUtil.inferSchema(JsonUtil.parse(jsonString), "schema");
我有 avro 架构,可以在架构注册表中注册。
我需要同一字符串的通用记录,因为 JSON 数据也保存这些值。
我正在尝试将枚举值序列化为 Avro 消息并发送它。该架构是用以下内容构建的:
Schema myschema = SchemaBuilder.record("com.testing.schemas").fields()
.name("enumTest").type().nullable().enumeration("aname")
.symbols("a","b","c","d","e").noDefault();
Run Code Online (Sandbox Code Playgroud)
看起来像这样
{"name":"enumTest","type":
[{"type":"enum","name":"aname",
"symbols":"a","b","c","d","e"]}
,"null"]}
Run Code Online (Sandbox Code Playgroud)
我的代码:
GenericRecord record = new GenericData.Record(myschema);
GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(
myschema.getField("enumTest").schema(), "a");
record.put("enumTest", symbol);
Run Code Online (Sandbox Code Playgroud)
当尝试发送消息时,我收到错误:
Error serializing Avro message
org.apache.avro.UnresolvedUnionException: Not in union [{"type":"enum","name":"aname","namespace":"com.testing.schemas","symbols":["a","b","c","d","e"]},"null"]: a
Run Code Online (Sandbox Code Playgroud)
我缺少什么?谢谢。
我开始使用 Confluence 架构注册表。我意识到每个主题只有一个模式。
注册表中实体主题的确切用途是什么,它不只是一个模式,例如用于 Kafka 中的主题。
你真的不能在一个主题中放置多个模式,对吗?
我们正在努力设置 Kafka 集群并探索 Avro 的使用,但我还没有找到有关 Avro 是否应该用于 Kafka 消息的键和值的指导。我已经探索了这两个用例,但我并没有真正看到在关键级别应用 AVRO 的好处。有什么好的理由这样做吗?后续如果不在密钥上使用 AVRO,首选转换器是什么(字符串、JSON 等)?
我在 localhost:8083 上有架构注册表,并且希望按名称获取 avro 架构“test.data”以在其他地方使用。
当我尝试时
$ curl --location --request GET 'http://localhost:8083/subjects?subject=test.data'
Run Code Online (Sandbox Code Playgroud)
我取回注册表中的所有模式。
有没有一种方法可以通过名称获取一个模式?