我正在尝试对 MySQL 使用模式时间戳,行数有限,因为我的表大小为 2.6 GB。
以下是我正在使用的连接器属性:
{
"name": "jdbc_source_mysql_registration_query",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
"query": "SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
"mode": "timestamp",
"timestamp.column.name": "DateUpdated",
"validate.non.null": "false",
"topic.prefix": "mysql-prod-kot-"
}
}
Run Code Online (Sandbox Code Playgroud)
我得到如下:
INFO TimestampIncrementingTableQuerier{table=null,query='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28'',topicPrefix='mysql-prod-kot-',incrementingColumn='', timestampColumns=[DateUpdated]} 准备好的 SQL 查询: SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' WHERE
DateUpdated
> ? 和DateUpdated
< ? ORDER BYDateUpdated
ASC (io.confluence.connect.jdbc.source.TimestampIncrementingTableQuerier:161) [2018-11-29 17:29:00,981] 错误无法运行表 TimestampIncrementingTableQuerier{table=null, query='SELECT matriid,DateUpdated 的查询来自 users.employee …
jdbc apache-kafka apache-kafka-connect confluent-schema-registry confluent-platform