Confluence:错误无法对表 TimestampIncrementingTableQuerier mysql-jdbc 运行查询

kot*_*esh 3 jdbc apache-kafka apache-kafka-connect confluent-schema-registry confluent-platform

我正在尝试对 MySQL 使用模式时间戳,行数有限,因为我的表大小为 2.6 GB。

以下是我正在使用的连接器属性:

{
        "name": "jdbc_source_mysql_registration_query",
        "config": {
                 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                 "key.converter": "io.confluent.connect.avro.AvroConverter",
                 "key.converter.schema.registry.url": "http://localhost:8081",
                 "value.converter": "io.confluent.connect.avro.AvroConverter",
                 "value.converter.schema.registry.url": "http://localhost:8081",
                 "connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
                 "query": "SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
                 "mode": "timestamp",
                 "timestamp.column.name": "DateUpdated",
                 "validate.non.null": "false",
                 "topic.prefix": "mysql-prod-kot-"
        }
}
Run Code Online (Sandbox Code Playgroud)

我得到如下:

INFO TimestampIncrementingTableQuerier{table=null,query='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28'',topicPrefix='mysql-prod-kot-',incrementingColumn='', timestampColumns=[DateUpdated]} 准备好的 SQL 查询: SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' WHERE DateUpdated> ? 和 DateUpdated< ? ORDER BY DateUpdatedASC (io.confluence.connect.jdbc.source.TimestampIncrementingTableQuerier:161) [2018-11-29 17:29:00,981] 错误无法运行表 TimestampIncrementingTableQuerier{table=null, query='SELECT matriid,DateUpdated 的查询来自 users.employee WHERE date(DateUpdated)>='2018-11-28'', topicPrefix='mysql-prod-kot-',incrementingColumn='', timestampColumns=[DateUpdated]}: {} (io.confluence. connect.jdbc.source.JdbcSourceTask:328) java.sql.SQLSyntaxErrorException: 您的 SQL 语法有错误;检查与您的 MySQL 服务器版本相对应的手册,了解在第 1 行'WHERE DateUpdated> '1970-01-01 00:00:00.0' AND < '2018-11-29 17' 附近使用的正确语法DateUpdated

Gio*_*ous 6

发生这种情况是因为您尝试同时使用"mode": "timestamp"queryTimestampIncrementingTableQuerierWHERE与.WHEREquery

JDBC 源连接器文档对此很清楚:

query

如果指定,则执行查询以选择新的或更新的行。如果您想要连接表、选择表中的列子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据——整个表复制将被禁用。不同的查询模式仍然可以用于增量更新,但是为了正确构造增量查询,必须可以向该查询附加WHERE子句(即不能使用WHERE子句)。如果使用 WHERE 子句,它必须自行处理增量查询

作为解决方法,您可以将查询修改为(取决于您使用的 SQL 风格)

SELECT * FROM ( SELECT * FROM table WHERE ...)
Run Code Online (Sandbox Code Playgroud)

或者

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a
Run Code Online (Sandbox Code Playgroud)

例如,在您的情况下,查询应该是

"query":"SELECT * FROM (SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28') o"
Run Code Online (Sandbox Code Playgroud)