我正在使用Kafka模式注册表来生成/使用Kafka消息,例如,我有两个字段均为字符串类型,伪模式如下所示?
{"name": "test1", "type": "string"}
{"name": "test2", "type": "string"}
Run Code Online (Sandbox Code Playgroud)
但是在发送和使用了一段时间之后,我需要修改架构以将第二个字段更改为long类型,然后引发以下异常:
Schema being registered is incompatible with an earlier schema; error code: 409
Run Code Online (Sandbox Code Playgroud)
我很困惑,如果架构注册表无法发展架构升级/更改,那为什么我应该使用架构注册表,或者为什么要使用Avro?
我们在当前的基础设施中设置了 vanilla apache Kafka,我们开始记录一些我们想要使用 Kafka Connect 处理的数据。目前我们使用 Avro 作为我们的消息格式,但我们的基础设施中没有 Schema Registry。将来,我们计划用 Confluent 替换当前堆栈并使用 Schema Registry 和 Connect,但有一段时间我们只需要为此部署 Connect。
是否可以以某种方式配置 Connect 接收器,以便它们使用显式的 avsc 文件或架构,而无需连接到架构注册表,也不使用具有魔术字节和架构 ID 的 Confluent 格式?
avro apache-kafka apache-kafka-connect confluent-schema-registry
我一直在尝试将 kafka-avro-console-consumer 从 Confluent 连接到我们遗留的 Kafka 集群,该集群是在没有 Confluent Schema Registry 的情况下部署的。我使用以下属性显式提供了架构:
kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
--topic test \
--from-beginning \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"long"}'
Run Code Online (Sandbox Code Playgroud)
但我收到“未知的魔法字节!” 错误org.apache.kafka.common.errors.SerializationException
是否可以使用 Confluent kafka-avro-console-consumer 消费来自 Kafka 的 Avro 消息,这些消息未使用 Confluent 的 AvroSerializer 和 Schema Registry 序列化?
avro apache-kafka confluent-schema-registry confluent-platform
我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序,但出现以下错误:
Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Run Code Online (Sandbox Code Playgroud)
我不确定是什么导致了这个错误。我只是想先将 Avro 记录放入应用程序,然后在那里处理它们,然后输出到另一个主题,但它似乎不起作用。我已经包含了下面应用程序中的代码。谁能看出为什么它不起作用?
Properties props = new Properties(); …Run Code Online (Sandbox Code Playgroud) java avro apache-kafka apache-kafka-streams confluent-schema-registry
我有一个已更新为包含新字段的架构。我正在使用avro反射和融合的架构注册表来反序列化/序列化数据,如下所示:
序列化:
Schema schema = REFLECT_DATA.getSchema(value.getClass());
try {
int registeredSchemaId = this.schemaRegistry.register(subject, schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
out.write(ByteBuffer.allocate(4).putInt(registeredSchemaId).array());
DatumWriter<Object> dw = new ReflectDatumWriter<>(schema);
Encoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
dw.write(value, encoder);
encoder.flush();
return out.toByteArray();
} catch (RuntimeException | IOException e) {
throw new SerializationException("Error serializing Avro message", e);
} catch (RestClientException e) {
throw new SerializationException("Error registering Avro schema: " + schema, e);
}
Run Code Online (Sandbox Code Playgroud)
反序列化:
if (readerSchema == null) {
readerSchema = new Schema.Parser().parse(schemaString);
}
int schemaId = -1; …Run Code Online (Sandbox Code Playgroud) 我已经在 Ubuntu 16.04 机器上安装了合流平台,最初我已经配置了 Zookeeper、Kafka 和 ksql 并启动了合流平台。我能够看到下面的消息。
root@DESKTOP-DIB3097:/opt/kafkafull/confluent-5.1.0/bin# ./confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.HUlCltYT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Run Code Online (Sandbox Code Playgroud)
现在一切都准备好了,当我检查汇合平台的状态时,我发现架构注册表、连接和控制中心都已关闭。
我检查了模式注册表的日志并发现了以下日志。
ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210) …Run Code Online (Sandbox Code Playgroud) 如何使 Kafka Connect JDBC 连接器连接到预定义的 Avro 架构?它在创建连接器时创建一个新版本。我正在阅读 DB2 并放入 Kafka 主题。我在创建过程中设置架构名称和版本,但它不起作用!这是我的连接器设置:
{
“名称”:“kafka-connect-jdbc-db2-tst-2”,
“配置”:{
"connector.class": "io.confluence.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:db2://mydb2:50000/testdb",
"连接.用户": "DB2INST1",
"连接密码": "12345678",
“查询”:“从TEST.MYVIEW4中选择CORRELATION_ID”,
“模式”:“递增”,
"incrementing.column.name": "CORRELATION_ID",
"validate.non.null": "假",
"topic.prefix": "tst-4" ,
"auto.register.schemas": "假",
"use.latest.version": "true",
"transforms": "RenameField,SetSchemaMetadata",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "CORRELATION_ID:id",
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "foo.bar.MyMessage",
"transforms.SetSchemaMetadata.schema.version": "1"
}
}
以下是架构:V.1 是我的,V.2 是由 JDBC 源连接器创建的:
$curl 本地主机:8081/subjects/tst-4-value/versions/1 | jq .
{
“主题”:“tst-4-值”,
“版本”:1,
“ID”:387,
"schema": "{"type":"record","name":"MyMessage",
"namespace":"foo.bar","fields":[{"name":"id","type":"int"}]}"
}
$curl 本地主机:8081/subjects/tst-4-value/versions/2 | jq .
{
“主题”:“tst-4-值”,
“版本”:2,
“ID”:386,
"schema": "{"type":"record","name":"MyMessage","namespace":"foo.bar", … avro apache-kafka apache-kafka-connect confluent-schema-registry
我正在尝试使用 kafka-rest 接口将 Protobuf 架构发布到架构注册表:
curl -X POST -H "Content-Type: application/vnd.kafka.protobuf.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"value_schema": "syntax=\"proto3\"; message User { string name = 1; }", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/protobuftest"
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
{"error_code":415,"message":"HTTP 415 Unsupported Media Type"}
Run Code Online (Sandbox Code Playgroud)
问题:指示媒体类型使其发挥作用的正确方法是什么?
protocol-buffers apache-kafka confluent-schema-registry kafka-rest confluent-platform
我们使用 kafka 与 avro 架构,并且架构注册表设置为FULL兼容性。我们的模式使用logicalType字段,例如:
{
"name": "MyRecord",
"type": "record",
"fields": [
{
"name": "created_at",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
Run Code Online (Sandbox Code Playgroud)
这对于我们正在使用的相当旧的版本来说工作得很好confluent-kafka,因为它依赖于avro-python31.8。但是,最近confluent-kafka依赖于avro-python31.10,消息序列化失败,并显示TypeError: unhashable type: 'mappingproxy'
我已经打开了一个 PR 来解决这个问题,但并没有引起太多关注。
假设它不会被合并,我还有哪些其他选项可以升级到最新版本confluent-kafka?
我看到的唯一解决方案是摆脱logicalType,但这将是不兼容的架构更改,因此我要么放弃兼容性FULL,要么使用绑定到不同架构的不同主题。
即使上述方法有效,我也必须手动将毫秒转换为时间戳,这对我们的代码库来说是一个很大的变化。
我正在使用 Kafka 流来读取和处理 protobuf 消息。
我正在为流使用以下属性:
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
properties.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.getClientId());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaConfig.getApplicationId());
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfig.getSchemaRegistryUrl());
properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, ProtobufData.class);
return properties;
}
Run Code Online (Sandbox Code Playgroud)
但在运行时我遇到了这个错误:
Caused by: java.lang.ClassCastException: class com.google.protobuf.DynamicMessage cannot be cast to class model.schema.proto.input.ProtobufDataProto$ProtobufData (com.google.protobuf.DynamicMessage and model.schema.proto.input.ProtobufDataProto$ProtobufData are in unnamed module of loader 'app')
我的.proto文件如下所示:
import "inner_data.proto";
package myPackage;
option java_package = "model.schema.proto.input";
option java_outer_classname = "ProtobufDataProto";
message OuterData {
string timestamp = 1;
string x = 3;
repeated InnerObject …Run Code Online (Sandbox Code Playgroud) java protocol-buffers apache-kafka-streams confluent-schema-registry