Debezium 无法对大表大小进行快照

jap*_*izo 1 apache-kafka apache-kafka-connect debezium

我想我可能错过了一些配置,但我们正在尝试使用 Debezium 对一个包含大约 800 万条记录的表中的所有行进行快照,一段时间后它会停止。

连接器配置是:

{
   "connector.class":"io.debezium.connector.mysql.MySqlConnector",
   "database.user":"MyUser",
   "database.server.id":"12345",
   "tasks.max":"1",
   "database.history.kafka.bootstrap.servers":"MyKafka:9092",
   "database.history.kafka.topic":"MyConnectorHistory",
   "database.server.name":"MyDbName",
   "database.port":"3306",
   "table.whitelist":"BigTable",
   "decimal.handling.mode":"double",
   "database.hostname":"***",
   "database.password":"***",
   "name":"MyConnector",
   "database.whitelist":"MyDb",
   "snapshot.mode":"initial_only",
   "connect.timeout.ms":"60000"
}

Run Code Online (Sandbox Code Playgroud)

连接器开始扫描行:

  April 24th 2019, 13:06:52.573 2019-04-24 16:06:52,569 INFO   MySQL|MyDbName|snapshot  Step 9: - 2040000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:59:29.129   [io.debezium.connector.mysql.SnapshotReader]
  ... other prints
  April 24th 2019, 12:17:28.448 2019-04-24 15:17:28,447 INFO   MySQL|MyDbName|snapshot  Step 9: - 50000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:10:05.008   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:43.183   2019-04-24 15:07:43,183 INFO   MySQL|MyDbName|snapshot  Step 9: - 40000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:19.744   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:36.499   2019-04-24 15:07:36,498 INFO   MySQL|MyDbName|snapshot  Step 9: - 30000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:13.059   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:30.157   2019-04-24 15:07:30,157 INFO   MySQL|MyDbName|snapshot  Step 9: - 20000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:06.718   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:25.116   2019-04-24 15:07:25,116 INFO   MySQL|MyDbName|snapshot  Step 9: - 10000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:01.677   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:23.439   2019-04-24 15:07:23,439 INFO   MySQL|MyDbName|snapshot  Step 9: - scanning table 'MyDb.BigTable' (1 of 10 tables)   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:23.427   2019-04-24 15:07:23,427 INFO   MySQL|MyDbName|snapshot  Step 8: tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:23.427   2019-04-24 15:07:23,427 INFO   MySQL|MyDbName|snapshot  Step 9: scanning contents of 10 tables while still in transaction   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:23.143   2019-04-24 15:07:23,143 INFO   MySQL|MyDbName|snapshot  Step 7: generating DROP and CREATE statements to reflect current database schemas:   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:23.142   2019-04-24 15:07:23,142 INFO   MySQL|MyDbName|snapshot  Step 6: read binlog position of MySQL master   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:22.739   2019-04-24 15:07:22,739 INFO   MySQL|MyDbName|snapshot  Step 5: flush and obtain read lock for 10 tables (preventing writes)   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:22.635   2019-04-24 15:07:22,635 INFO   MySQL|MyDbName|snapshot  Step 4: read list of available tables in each database   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:22.633   2019-04-24 15:07:22,633 INFO   MySQL|MyDbName|snapshot  Step 3: read list of available databases   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:22.632   2019-04-24 15:07:22,632 INFO   MySQL|MyDbName|snapshot  Step 2: start transaction with consistent snapshot   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:22.632   2019-04-24 15:07:22,631 INFO   MySQL|MyDbName|snapshot  Step 1: unable to flush and acquire global read lock, will use table read locks after reading table names   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:22.617   2019-04-24 15:07:22,617 INFO   MySQL|MyDbName|snapshot  Step 1: flush and obtain global read lock to prevent writes to database   [io.debezium.connector.mysql.SnapshotReader]
Run Code Online (Sandbox Code Playgroud)

然后过了一段时间,我们得到

    Failed to flush, timed out while waiting for producer to flush outstanding 4094 messages
    Failed to commit offsets   [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]

Run Code Online (Sandbox Code Playgroud)

然后,扫描停止,我们有几次尝试再次刷新提交偏移量:

  April 24th 2019, 12:34:08.641 2019-04-24 15:34:08,641 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets   [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
    April 24th 2019, 12:34:08.640   2019-04-24 15:34:08,640 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 5560 messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:33:18.640   2019-04-24 15:33:18,640 INFO   ||  WorkerSourceTask{id=MyConnectorr-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:33:18.640   2019-04-24 15:33:18,640 INFO   ||  WorkerSourceTask{id=MyConnectorr-0} flushing 5560 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:32:18.640   2019-04-24 15:32:18,640 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets   [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
    April 24th 2019, 12:32:18.639   2019-04-24 15:32:18,639 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 5560 messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:31:28.639   2019-04-24 15:31:28,639 INFO   ||  WorkerSourceTask{id=MyConnectorr-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:31:28.639   2019-04-24 15:31:28,639 INFO   ||  WorkerSourceTask{id=MyConnectorr-0} flushing 5560 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:30:28.639   2019-04-24 15:30:28,639 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets   [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
    April 24th 2019, 12:30:28.636   2019-04-24 15:30:28,635 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 652 messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:29:38.635   2019-04-24 15:29:38,635 INFO   ||  WorkerSourceTask{id=MyConnectorr-0} flushing 5556 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    April 24th 2019, 12:29:38.635   2019-04-24 15:29:38,635 INFO   ||  WorkerSourceTask{id=MyConnectorr-0} Committing offsets  
Run Code Online (Sandbox Code Playgroud)

一段时间后(大约 9 ~ 10 分钟),它似乎成功并开始再次扫描行。但是一段时间后它再次失败,然后没有完成所有记录,连接器将其状态更改为FAIL

你的错误之一是

{
   "name":"MyConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"svc.cluster.local:8083"
   },
   "tasks":[
      {
         "state":"FAILED",
         "trace":"org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing\n\tat org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:318)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:197)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
         "id":0,
         "worker_id":"svc.cluster.local:8083"
      }
   ],
   "type":"source"
}
Run Code Online (Sandbox Code Playgroud)

我阅读了这个问题:https : //github.com/confluentinc/kafka-connect-jdbc/issues/161 并尝试按照建议更改参数值。更好,但过了一段时间后仍然失败:现在,我的连接配置是:

OFFSET_FLUSH_INTERVAL_MS: 60000
OFFSET_FLUSH_TIMEOUT_MS: 50000
CONNECT_PRODUCER_BUFFER_MEMORY: 45554432
Run Code Online (Sandbox Code Playgroud)

我还尝试了此处描述的这些值:Debezium flush timeout and OutOfMemoryError errors with MySQL

我还没有尝试过的一件事是使用snapshot.select.statement.overrides参数。但我不确定它会有所帮助,因为有时在 100k 条消息时会出现提交偏移问题。我将不得不多次恢复和停止连接器。

mah*_*r24 5

我使用 Debezium 对 MySQL 数据库进行了快照,该数据库具有超过 3000 万条记录的多个表。不过,我们有一个拥有 1 亿多条记录的记录。对于那个,我使用了 select 语句覆盖配置(因为它是一个仅插入表)。

最初,使用默认设置对数据库进行快照,我遇到了与您面临的完全相同的问题。我调整了以下配置,它帮助解决了我的问题。

kafka 连接 worker 配置在 worker.properties 配置文件中设置:

offset.flush.timeout.ms=60000
offset.flush.interval.ms=10000
max.request.size=10485760
Run Code Online (Sandbox Code Playgroud)

减少偏移刷新间隔允许 Kafka 连接更频繁地刷新偏移,设置一个大的超时让它有更多的时间来获得对提交的确认。

Debezium 配置通过 curl 请求对其进行初始化:

max.queue.size = 81290
max.batch.size = 20480
Run Code Online (Sandbox Code Playgroud)

队列的默认大小是 8192,这对于较大的数据库来说是相当低的。增加这些配置有很大帮助。

希望这对您的问题有所帮助

  • 平均每 5 分钟大约有 100 万条记录。时间因牌桌而异,但总是在 4-6 分钟之间。请注意,我们的 Kafka 集群(4 个节点、16GB RAM、4 个核心)强制执行了 SASL/SSL,因此速度明显减慢。当我使用相同大小但没有 SASL/SSL 的 Kafka 集群对其进行测试时,我注意到它每 2 分钟左右执行大约一百万次。其他影响速度的因素是 Kafka 中这些主题的 Producer.acks 设置和复制因子。拍摄数据库快照(总共 8 亿多条记录)需要 3 天。 (3认同)