标签: confluent-schema-registry

将 Avro 架构保存到 Confluence Schema-Registry

我正在尝试使用 Postman 向注册表写入一个非常简单的模式,并且一直很难让它注册。只是注册一个简单的模式真的这么复杂吗?这只是整个过程的第一步,还是我在这里遗漏了一些东西?我正在使用的架构如下:

    {
"schema":{
  "type" : "record",
  "name" : "User",
  "namespace" : "com.temp.avro.model",
  "fields" : [ {
    "name" : "_id",
    "type" : "string"
  }, {
    "name" : "updatedDate",
    "type":"long",
    "logicalType":"timestamp-millis"
  }, {
    "name" : "createdDate",
    "type":"long",
    "logicalType":"timestamp-millis"
  }, {
    "name" : "applicationId",
    "type": ["null", "string"],
    "default": null
  },{
    "name" : "country",
    "type" : "string"
  }, {
    "name" : "bank",
    "type" : "string"
  }]
}
}
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Internal Server Error com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT …
Run Code Online (Sandbox Code Playgroud)

avro confluent-schema-registry confluent-platform

2
推荐指数
1
解决办法
1404
查看次数

在模式注册表中,消费者的模式可能与生产者的模式不同,这实际上意味着什么

在向 Kafka 生成 AVRO 数据时,Avro 序列化程序在写入数据时使用的字节数组中写入相同的架构 ID。

Kafka Consumer 根据收到的字节数组中的模式 ID 从模式注册表中获取模式。因此,生产者和消费者都使用相同的模式 ID,因此模式也如此。

但为什么包括这篇文章在内的许多文章都说消费者的模式可能与生产者的模式不同。

请帮助我理解这一点。

avro apache-kafka kafka-producer-api confluent-schema-registry

2
推荐指数
1
解决办法
1059
查看次数

如何使用Spring云流访问受密码保护的汇合模式注册表服务器?

我将 Spring Cloud Stream 与A​​iven的架构注册表一起使用,后者使用Confluence 的架构注册表。Aiven 的架构注册表受密码保护。根据这些说明,需要设置这两个配置参数才能成功访问架构注册表服务器。

 props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
Run Code Online (Sandbox Code Playgroud)

当我只使用vanilla java的kafka驱动程序时一切都很好,但是如果我使用Spring云流,我不知道如何注入这两个参数。目前,我正在将"basic.auth.user.info""basic.auth.credentials.source"放入文件"spring.cloud.stream.kafka.binder.configuration"application.yml

这样做,我就进入"401 Unauthorized"了模式想要注册的线路。

更新1:

根据 Ali n 的建议,我更新了 SchemaRegistryClient 的 bean 的配置方式,以便它能够识别 SSL 上下文。

@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> …
Run Code Online (Sandbox Code Playgroud)

avro apache-kafka spring-cloud-stream confluent-schema-registry aiven

2
推荐指数
1
解决办法
8127
查看次数

无法反序列化主题数据

我正在使用这里的汇合 cp-all-in-one 项目配置:https://github.com/confluenceinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one /docker-compose.yml

http://localhost:8082/topics/zuum-positions 我正在使用以下 AVRO 正文发布一条消息:

{  
   "key_schema": "{\"type\":\"string\"}",
   "value_schema":"{  \"type\":\"record\",\"name\":\"Position\",\"fields\":[  {  \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{  \"name\":\"lon\",\"type\":\"double\"}]}",
   "records":[  
      {  
         "key":"22",
         "value":{  
            "lat":43.33,
            "lon":43.33,
            "loadId":22
         }
      }
   ]
}
Run Code Online (Sandbox Code Playgroud)

我已将以下标头正确添加到上述 POST 请求中: Content-Type: application/vnd.kafka.avro.v2+json Accept: application/vnd.kafka.v2+json

执行此请求时,我在 docker 日志中看到以下异常:

Error encountered in task zuum-sink-positions-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='zuum-positions', partition=0, offset=25, timestamp=1563480487456, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic zuum-positions to Avro: 
connect            |    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker confluent-schema-registry kafka-rest confluent-platform

2
推荐指数
1
解决办法
1万
查看次数

为 ECS 中运行的架构注册表实例提供主机名

我正在使用 ECS 集群为我们的 MSK Kafka 集群构建托管在 Amazon 中的冗余架构注册表。

SchemaRegistry TaskDefinition 需要定义每个任务运行时唯一的主机名。

SchemaRegistryTaskDefinition:
    Type: AWS::ECS::TaskDefinition
    Properties:
      Family: !Ref SchemaRegistryTaskName
      RequiresCompatibilities: [ EC2 ]
      NetworkMode: bridge
      Cpu: !Ref CPUReservation
      Memory: !Ref MemoryReservation
      Volumes: []
      ContainerDefinitions:
        - Name: !Ref SchemaRegistryTaskName
          Image: !Ref SchemaRegistryTaskImage
          Essential: true
          PortMappings:
            - ContainerPort: !Ref SchemaRegistryPort
              HostPort: 0 # Randomly assigned port from the ephemeral port range.
          Environment:
            - Name: AWS_DEFAULT_REGION
              Value: !Ref AWS::Region
            - Name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
              Value: !Ref MskBrokerUrls
            - Name: SCHEMA_REGISTRY_HOST_NAME
              Value: $HOSTNAME
          LogConfiguration:
            LogDriver: awslogs
            Options: …
Run Code Online (Sandbox Code Playgroud)

aws-cloudformation confluent-schema-registry aws-msk

2
推荐指数
2
解决办法
1264
查看次数

在Java中将JSON字符串转换为Avro GenericRecord(生成的Schema是通过相同的Json字符串)

我将 JSON 字符串作为输入,我使用此将 Json 字符串转换为 avro 模式

架构 schema = JsonUtil.inferSchema(JsonUtil.parse(jsonString), "schema");

我有 avro 架构,可以在架构注册表中注册。

我需要同一字符串的通用记录,因为 JSON 数据也保存这些值。

json avro apache-kafka confluent-schema-registry

2
推荐指数
1
解决办法
1万
查看次数

Java 中的 Avro 枚举序列化

我正在尝试将枚举值序列化为 Avro 消息并发送它。该架构是用以下内容构建的:

Schema myschema = SchemaBuilder.record("com.testing.schemas").fields()
.name("enumTest").type().nullable().enumeration("aname")
.symbols("a","b","c","d","e").noDefault();
Run Code Online (Sandbox Code Playgroud)

看起来像这样

{"name":"enumTest","type":
[{"type":"enum","name":"aname",
"symbols":"a","b","c","d","e"]}
,"null"]}
Run Code Online (Sandbox Code Playgroud)

我的代码:

GenericRecord record = new GenericData.Record(myschema);
GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(
myschema.getField("enumTest").schema(), "a");
record.put("enumTest", symbol);
Run Code Online (Sandbox Code Playgroud)

当尝试发送消息时,我收到错误:

Error serializing Avro message
org.apache.avro.UnresolvedUnionException: Not in union [{"type":"enum","name":"aname","namespace":"com.testing.schemas","symbols":["a","b","c","d","e"]},"null"]: a
Run Code Online (Sandbox Code Playgroud)

我缺少什么?谢谢。

java enums avro confluent-schema-registry

2
推荐指数
1
解决办法
4027
查看次数

为什么在 Confluence 的模式注册表中使用主题?

我开始使用 Confluence 架构注册表。我意识到每个主题只有一个模式。

注册表中实体主题的确切用途是什么,它不只是一个模式,例如用于 Kafka 中的主题。

你真的不能在一个主题中放置多个模式,对吗?

avro apache-kafka confluent-schema-registry

2
推荐指数
1
解决办法
4531
查看次数

Avro 应该同时用于 Kafka 中的键和值吗?

我们正在努力设置 Kafka 集群并探索 Avro 的使用,但我还没有找到有关 Avro 是否应该用于 Kafka 消息的键和值的指导。我已经探索了这两个用例,但我并没有真正看到在关键级别应用 AVRO 的好处。有什么好的理由这样做吗?后续如果不在密钥上使用 AVRO,首选转换器是什么(字符串、JSON 等)?

avro apache-kafka confluent-schema-registry

2
推荐指数
1
解决办法
2571
查看次数

如何按主题从架构注册表获取 avro 架构?

我在 localhost:8083 上有架构注册表,并且希望按名称获取 avro 架构“test.data”以在其他地方使用。

当我尝试时

$ curl --location --request GET 'http://localhost:8083/subjects?subject=test.data'
Run Code Online (Sandbox Code Playgroud)

我取回注册表中的所有模式。

有没有一种方法可以通过名称获取一个模式?

apache-kafka confluent-schema-registry

2
推荐指数
1
解决办法
1275
查看次数