Sha*_*oor 0 regex jdbc apache-kafka apache-kafka-connect confluent-platform
我正在使用这个debezium-examples
我在jdbc-sink.json 中添加了"topics.regex": "CID1122.(.*)"如下
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "CID1122.(.*)",
"connection.url": "jdbc:mysql://mysql:3306/inventory?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "debezium",
"auto.create": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"name": "jdbc-sink",
"insert.mode": "upsert",
"pk.fields": "id,companyId",
"pk.mode": "record_value"
}
}
Run Code Online (Sandbox Code Playgroud)
Kafka 主题列表是
CID1122.department
CID1122.designation
CID1122.employee
Run Code Online (Sandbox Code Playgroud)
我面对卡夫卡 java.lang.NullPointerException
connect_1 | 2019-01-30 06:14:47,302 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,303 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,342 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,343 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,344 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1 | java.lang.NullPointerException
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | 2019-01-30 06:14:47,345 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | Caused by: java.lang.NullPointerException
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1 | ... 10 more
connect_1 | 2019-01-30 06:14:47,345 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
Run Code Online (Sandbox Code Playgroud)
任何解决方法?
您缺少table.name.format属性 https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_config_options.html(数据映射部分)
这是一个工作示例:
{
"name": "test-0005",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "CID1122.(.*)",
"connection.user": "kafka",
"table.name.format": "${topic}",
"connection.password": "kafka",
"connection.url": "jdbc:mysql://databasehost:3306/dbname",
"auto.create": "true",
"transforms": "route",
"transforms.t1.replacement": "$2",
"transforms.route.regex": "([^.]+)\\.([^.]+)",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
Run Code Online (Sandbox Code Playgroud)
怎么了?
如您所见,我添加了RegexRouter 转换,以便在下沉到 MySQL 之前动态提取主题名称,我使用的模式:([^.]+)\.([^.]+)是匹配我们的主题 。regex CID1122。 [事件名称]然后我只提取了第 2 组(事件名称)。
最后,这组$2将作为${topic}传递给table.name.format,然后您可以连接到您的数据库并检查您的数据。
| 归档时间: |
|
| 查看次数: |
3580 次 |
| 最近记录: |