Schema-Registry 拒绝未更改的模式,因为它不兼容

Fre*_*076 5 avro apache-kafka apache-kafka-streams confluent-schema-registry

我们有一个 kafka 集群,运行着存储在 Confluence 的 Schema-registry 中的 Avro schema。在最近重新部署(其中一个)我们的流应用程序时,我们开始在单个主题(EmailSent)上看到不兼容的架构错误。这是唯一失败的主题,每当向该主题提交新的 EmailSent 事件时,我们都会收到错误。

Caused by:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"EmailSent","namespace":"com.company_name.communications.schemas","fields":[{"name":"customerId","type":"long","doc":"Customer's ID in the customers service"},{"name":"messageId","type":"long","doc":"The message id of the sent email"},{"name":"sentTime","type":{"type":"string","avro.java.string":"String"},"doc":"The campaign sent time in format 'yyyy-MM-dd HH:mm:ss.SSS'"},{"name":"campaignId","type":"long","doc":"The id of the campaign in the marketing suite"},{"name":"appId","type":["null","long"],"doc":"The app id associated with the sent email, if the email was related to a specific application","default":null}],"version":1}
Caused by:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at 
    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:91)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:35)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:79)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:232)
    at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:245)
    at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:193)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:188)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:35)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:63)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:222)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:409)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:308)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:939)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
Run Code Online (Sandbox Code Playgroud)

自 2018 年 6 月以来,此架构一直保持不变,到目前为止我们已经成功处理了 EmailSent 事件。

与我们的 Streams 应用程序部署相关的 PR 不会更改架构、流处理器抛出错误,也不会更改任何流应用程序的依赖项。我的怀疑在于模式注册,是否有人有类似的经验或对可能导致失败的原因的见解?我找不到有关错误代码 409 的任何信息,这是否给任何人敲响了警钟?

提前致谢。

cri*_*007 1

我不认为服务器会撒谎。您没有向我们展示您的两种模式来比较它们(注册表中的模式与错误消息中的模式)。

解决该问题的一种方法是将配置设置为“无”兼容性,

export KAFKA_TOPIC=logEvents
curl -X PUT -H "Content-Type:application/json" http://schema-registry:8081/config/${KAFKA_TOPIC}-value -d '{"compatibility": "NONE"}' 
Run Code Online (Sandbox Code Playgroud)

${KAFKA_TOPIC}-key(如果需要,请执行相同的操作)

然后上传您的新架构。

  1. 完成后将其设置回向后兼容性(或原始配置)
  2. 这可能会损坏 Avro 消费者读取来自旧模式和新的不兼容模式的消息。