kafka-connect-jdbc 不从源获取连续的时间戳

Geo*_*ona 1 postgresql jdbc apache-kafka apache-kafka-connect confluent-platform

我使用 kafka-connect-jdbc-4.0.0.jar 和 postgresql-9.4-1206-jdbc41.jar

kafka connect的connector配置

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "updated_at",
  "topic.prefix": "streaming.data.v2",
  "connection.password": "password",
  "connection.user": "user",
  "schema.pattern": "test",
  "query": "select * from view_source",
  "connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}
Run Code Online (Sandbox Code Playgroud)

我已经使用 jdbc 驱动程序配置了两个连接器一个源和另一个接收器,针对 postgresql 数据库(“PostgreSQL 9.6.9”)一切正常

我对连接器如何收集源数据有疑问,查看日志我看到执行查询之间存在 21 秒的时间差

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:19[2019-01-11 08:20:19,070] DEBUG Resetting querier TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)

11/1/2019 9:20:49[2019-01-11 08:20:49,499] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)

Run Code Online (Sandbox Code Playgroud)

第一个查询收集 08: 17: 07.000 和 08: 20: 18.985 之间的数据,但第二个查询收集 08: 20: 39.000 和 08: 20: 49.500 之间的数据......两者之间可能存在 21 秒的差异记录 ...

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 
Run Code Online (Sandbox Code Playgroud)

我假设其中一个数据是获得的最后一条记录,另一个值是当时的时间戳

我找不到有关此的解释 连接器是否正常运行?您是否应该假设您并不总是会收集所有信息?

Rob*_*att 6

JDBC 连接器不能保证检索每条消息。为此,您需要基于日志的变更数据捕获。对于由 Debezium 和 Kafka Connect 提供的 Postgres。您可以在此处阅读更多相关信息。

免责声明:我为 Confluent 工作,并撰写了上述博客

编辑:这也是 ApacheCon 2020 上上述博客的录音:https ://rmoff.dev/no-more-silos

  • 非常感谢您的回复,我很看重 但是,为什么连接器不保证每一行的传输?既不使用递增模式?或时间戳+递增?我一定要使用 Debezium 吗? (2认同)
  • 这是基于轮询的方法的一个隐含方面,您无法防止在两次轮询尝试之间发生多个更新,在这种情况下,第一个更新将不会被捕获。Debezium 所做的基于日志的 CDC 避免了这种情况,因为它从 DBs 仅附加日志文件中获取所有更改(免责声明:我正在为 Debezium 做出贡献)。 (2认同)
  • @古纳尔。例如,如果我仅将“增量列”模式与 JDBC 连接器一起使用,并且每 1 小时轮询一次,是否有理由相信我会错过任何新记录? (2认同)