我正在尝试在docker的帮助下使用kafka debezium(Kafka流)将一个数据库的表数据下沉到另一个数据库。数据库流工作正常。但流式数据接收另一个 MySQL DB 进程时出现错误。
对于我的连接器接收器配置如下。
{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "mysql-connect.kafka_test.employee",
"connection.url": "jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"errors.tolerance": "all",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "mysql_sink"
}
}
Run Code Online (Sandbox Code Playgroud)
但我收到错误。
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.sql.SQLException: No …Run Code Online (Sandbox Code Playgroud)