Gio*_*ous 8 avro apache-kafka confluent-schema-registry
我有一个具有以下配置的接收器连接器
{
"name": "sink-test-mariadb-MY_TOPIC",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"10",
"topics":"MY_TOPIC",
"connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
"auto.create":"false",
"auto.evolve":"true",
"table.name.format":"MY_TABLE",
"pk.mode":"record_value",
"pk.fields":"ID",
"insert.mode":"upsert",
"transforms":"ExtractField",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"data"
}
}
Run Code Online (Sandbox Code Playgroud)
一段时间后,连接器的所有任务都会失败,并出现以下错误:
{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)",
"id": 0,
"worker_id": "localhost:8083"
}
Run Code Online (Sandbox Code Playgroud)
连接器设法将主题与数据库同步,但突然无缘无故失败。我也非常确定该架构就在那里。其主题出现在调用架构注册表 API 返回的列表中localhost:8081/subjects
[
...
MY_TOPIC-value
...
]
Run Code Online (Sandbox Code Playgroud)
小智 12
我遇到了同样的问题,我意识到代码 40403 并不真正意味着找不到架构,它意味着架构与所需的架构不对应。如果根本找不到架构 (40401),则存在不同的代码。
所以我所做的就是相应地改变模式,它对我有用。
Kafka 主题上的消息使用与架构注册表中的架构不同的架构版本进行序列化。也许它是由将模式写入不同模式注册表或不同环境中的工具生成的?为了能够反序列化它,Kafka Connect 需要能够检索该主题的 Kafka 消息开头的魔术字节中的模式 ID。
该架构不存在于您的架构注册表中,如下所示:
GET /schemas/ids/803
{ "error_code": 40403, "message": "Schema not found" }
Run Code Online (Sandbox Code Playgroud)
您可以通过查看来检查您所拥有的架构的 ID
curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/3/"|jq '.id'
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
21519 次 |
| 最近记录: |