标签: confluent-schema-registry

Kafka模式注册表在同一主题中不兼容

我正在使用Kafka模式注册表来生成/使用Kafka消息,例如,我有两个字段均为字符串类型,伪模式如下所示?

{"name": "test1", "type": "string"}
{"name": "test2", "type": "string"}
Run Code Online (Sandbox Code Playgroud)

但是在发送和使用了一段时间之后,我需要修改架构以将第二个字段更改为long类型,然后引发以下异常:

Schema being registered is incompatible with an earlier schema; error code: 409
Run Code Online (Sandbox Code Playgroud)

我很困惑,如果架构注册表无法发展架构升级/更改,那为什么我应该使用架构注册表,或者为什么要使用Avro?

avro apache-kafka confluent confluent-schema-registry

5
推荐指数
4
解决办法
7434
查看次数

在没有 Confluent Schema Registry 的情况下在 KafkaConnect 中使用 Avro

我们在当前的基础设施中设置了 vanilla apache Kafka,我们开始记录一些我们想要使用 Kafka Connect 处理的数据。目前我们使用 Avro 作为我们的消息格式,但我们的基础设施中没有 Schema Registry。将来,我们计划用 Confluent 替换当前堆栈并使用 Schema Registry 和 Connect,但有一段时间我们只需要为此部署 Connect。

是否可以以某种方式配置 Connect 接收器,以便它们使用显式的 avsc 文件或架构,而无需连接到架构注册表,也不使用具有魔术字节和架构 ID 的 Confluent 格式?

avro apache-kafka apache-kafka-connect confluent-schema-registry

5
推荐指数
1
解决办法
3944
查看次数

带有 kafka-avro-console-consumer 的未知魔法字节

我一直在尝试将 kafka-avro-console-consumer 从 Confluent 连接到我们遗留的 Kafka 集群,该集群是在没有 Confluent Schema Registry 的情况下部署的。我使用以下属性显式提供了架构:

kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
    --topic test \
    --from-beginning \
    --property key.schema='{"type":"long"}' \
    --property value.schema='{"type":"long"}'
Run Code Online (Sandbox Code Playgroud)

但我收到“未知的魔法字节!” 错误org.apache.kafka.common.errors.SerializationException

是否可以使用 Confluent kafka-avro-console-consumer 消费来自 Kafka 的 Avro 消息,这些消息未使用 Confluent 的 AvroSerializer 和 Schema Registry 序列化?

avro apache-kafka confluent-schema-registry confluent-platform

5
推荐指数
1
解决办法
1万
查看次数

Kafka Streams - SerializationException:未知的魔法字节

我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序,但出现以下错误:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Run Code Online (Sandbox Code Playgroud)

我不确定是什么导致了这个错误。我只是想先将 Avro 记录放入应用程序,然后在那里处理它们,然后输出到另一个主题,但它似乎不起作用。我已经包含了下面应用程序中的代码。谁能看出为什么它不起作用?

    Properties props = new Properties(); …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka apache-kafka-streams confluent-schema-registry

5
推荐指数
1
解决办法
2万
查看次数

Avro无法使用更新的架构反序列化消息

我有一个已更新为包含新字段的架构。我正在使用avro反射和融合的架构注册表来反序列化/序列化数据,如下所示:

序列化:

Schema schema = REFLECT_DATA.getSchema(value.getClass());
try {
    int registeredSchemaId = this.schemaRegistry.register(subject, schema);

    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    out.write(ByteBuffer.allocate(4).putInt(registeredSchemaId).array());

    DatumWriter<Object> dw = new ReflectDatumWriter<>(schema);
    Encoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
    dw.write(value, encoder);
    encoder.flush();
    return out.toByteArray();
} catch (RuntimeException | IOException e) {
    throw new SerializationException("Error serializing Avro message", e);
} catch (RestClientException e) {
    throw new SerializationException("Error registering Avro schema: " + schema, e);
}
Run Code Online (Sandbox Code Playgroud)

反序列化:

if (readerSchema == null) {
    readerSchema = new Schema.Parser().parse(schemaString);
}

int schemaId = -1; …
Run Code Online (Sandbox Code Playgroud)

java scala avro confluent-schema-registry

5
推荐指数
1
解决办法
267
查看次数

无法启动 Confluence 架构注册表

我已经在 Ubuntu 16.04 机器上安装了合流平台,最初我已经配置了 Zookeeper、Kafka 和 ksql 并启动了合流平台。我能够看到下面的消息。

 root@DESKTOP-DIB3097:/opt/kafkafull/confluent-5.1.0/bin# ./confluent start
 This CLI is intended for development only, not for production
 https://docs.confluent.io/current/cli/index.html
 Using CONFLUENT_CURRENT: /tmp/confluent.HUlCltYT
 Starting zookeeper
 zookeeper is [UP]
 Starting kafka
 kafka is [UP]
 Starting schema-registry
 schema-registry is [UP]
 Starting kafka-rest
 kafka-rest is [UP]
 Starting connect
 connect is [UP]
 Starting ksql-server
 ksql-server is [UP]
 Starting control-center
 control-center is [UP]
Run Code Online (Sandbox Code Playgroud)

现在一切都准备好了,当我检查汇合平台的状态时,我发现架构注册表、连接和控制中心都已关闭。

我检查了模式注册表的日志并发现了以下日志。


ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka confluent-schema-registry confluent-platform

5
推荐指数
2
解决办法
1万
查看次数

Kafka Connect - JDBC 源连接器 - 设置 Avro 架构

如何使 Kafka Connect JDBC 连接器连接到预定义的 Avro 架构?它在创建连接器时创建一个新版本。我正在阅读 DB2 并放入 Kafka 主题。我在创建过程中设置架构名称和版本,但它不起作用!这是我的连接器设置:

    {
      “名称”:“kafka-connect-jdbc-db2-tst-2”,
      “配置”:{
        "connector.class": "io.confluence.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:db2://mydb2:50000/testdb",
        "连接.用户": "DB2INST1",
        "连接密码": "12345678",
        “查询”:“从TEST.MYVIEW4中选择CORRELATION_ID”,
        “模式”:“递增”,
        "incrementing.column.name": "CORRELATION_ID",
        "validate.non.null": "假",
        "topic.prefix": "tst-4" ,
        "auto.register.schemas": "假",
        "use.latest.version": "true",
        "transforms": "RenameField,SetSchemaMetadata",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "CORRELATION_ID:id",
        "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
        "transforms.SetSchemaMetadata.schema.name": "foo.bar.MyMessage",
        "transforms.SetSchemaMetadata.schema.version": "1"
    
      }
    
    }

以下是架构:V.1 是我的,V.2 是由 JDBC 源连接器创建的:

    $curl 本地主机:8081/subjects/tst-4-value/versions/1 | jq .
    
    {
      “主题”:“tst-4-值”,
      “版本”:1,
      “ID”:387,
      "schema": "{"type":"record","name":"MyMessage",
    "namespace":"foo.bar","fields":[{"name":"id","type":"int"}]}"
    }
    
    $curl 本地主机:8081/subjects/tst-4-value/versions/2 | jq .
    {
      “主题”:“tst-4-值”,
      “版本”:2,
      “ID”:386,
      "schema": "{"type":"record","name":"MyMessage","namespace":"foo.bar", …

avro apache-kafka apache-kafka-connect confluent-schema-registry

5
推荐指数
0
解决办法
1778
查看次数

如何使用架构注册表/Kafka-Rest 正确注册 Protobuf 架构

我正在尝试使用 kafka-rest 接口将 Protobuf 架构发布到架构注册表:

curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
   -H "Accept: application/vnd.kafka.v2+json" \
   --data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
   "http://localhost:8082/topics/protobuftest"
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

{"error_code":415,"message":"HTTP 415 Unsupported Media Type"}
Run Code Online (Sandbox Code Playgroud)

问题:指示媒体类型使其发挥作用的正确方法是什么?

protocol-buffers apache-kafka confluent-schema-registry kafka-rest confluent-platform

5
推荐指数
1
解决办法
998
查看次数

具有逻辑类型的 Avro 模式不能与最新的 confluence-kafka 一起使用

我们使用 kafka 与 avro 架构,并且架构注册表设置为FULL兼容性。我们的模式使用logicalType字段,例如:

{
  "name": "MyRecord",
  "type": "record",
  "fields": [
    {
      "name": "created_at",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

这对于我们正在使用的相当旧的版本来说工作得很好confluent-kafka,因为它依赖于avro-python31.8。但是,最近confluent-kafka依赖于avro-python31.10,消息序列化失败,并显示TypeError: unhashable type: 'mappingproxy'

我已经打开了一个 PR 来解决这个问题,但并没有引起太多关注。

假设它不会被合并,我还有哪些其他选项可以升级到最新版本confluent-kafka

我看到的唯一解决方案是摆脱logicalType,但这将是不兼容的架构更改,因此我要么放弃兼容性FULL,要么使用绑定到不同架构的不同主题。

即使上述方法有效,我也必须手动将毫秒转换为时间戳,这对我们的代码库来说是一个很大的变化。

python avro apache-kafka confluent-schema-registry

5
推荐指数
1
解决办法
1300
查看次数

kafka 流 protobuf 转换异常

我正在使用 Kafka 流来读取和处理 protobuf 消息。

我正在为流使用以下属性:


        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
        properties.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.getClientId());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaConfig.getApplicationId());
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());

        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
        properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfig.getSchemaRegistryUrl());
        properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, ProtobufData.class);
        return properties;
    }
Run Code Online (Sandbox Code Playgroud)

但在运行时我遇到了这个错误:

Caused by: java.lang.ClassCastException: class com.google.protobuf.DynamicMessage cannot be cast to class model.schema.proto.input.ProtobufDataProto$ProtobufData (com.google.protobuf.DynamicMessage and model.schema.proto.input.ProtobufDataProto$ProtobufData are in unnamed module of loader 'app')

我的.proto文件如下所示:

import "inner_data.proto";
package myPackage;

option java_package = "model.schema.proto.input";
option java_outer_classname = "ProtobufDataProto";

message OuterData {
    string timestamp = 1;
    string x = 3;
    repeated InnerObject …
Run Code Online (Sandbox Code Playgroud)

java protocol-buffers apache-kafka-streams confluent-schema-registry

5
推荐指数
1
解决办法
1518
查看次数