我已阅读http://debezium.io/docs/connectors/mysql/,但我找不到任何有关是否可以配置debezium的信息,以便可以将2个(或更多)表中的更改写入相同的单个kafka话题?在我看来,它总是 1 个表 -> 1 个主题。
使用Debezium 0.7读取MySQL,但在初始快照阶段出现刷新超时和OutOfMemoryError错误。查看下面的日志,似乎连接器试图一次写太多消息:
WorkerSourceTask{id=accounts-connector-0} flushing 143706 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
WorkerSourceTask{id=accounts-connector-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
Exception in thread "RMI TCP Connection(idle)" java.lang.OutOfMemoryError: Java heap space
WorkerSourceTask{id=accounts-connector-0} Failed to flush, timed out while waiting for producer to flush outstanding 143706 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
Run Code Online (Sandbox Code Playgroud)
对于大型数据库(> 50GB),想知道什么是正确的设置http://debezium.io/docs/connectors/mysql/#connector-properties。对于较小的数据库,我没有这个问题。简单地增加超时时间似乎不是一个好的策略。我目前正在使用默认的连接器设置。
按照以下建议更改设置,并解决了问题:
OFFSET_FLUSH_TIMEOUT_MS: 60000 # default 5000
OFFSET_FLUSH_INTERVAL_MS: 15000 # default 60000
MAX_BATCH_SIZE: 32768 # default 2048
MAX_QUEUE_SIZE: 131072 # default 8192
HEAP_OPTS: '-Xms2g -Xmx2g' # default '-Xms1g -Xmx1g'
Run Code Online (Sandbox Code Playgroud) 我们正在使用Debezium从数据库中读取二进制日志到消息代理的流数据中的项目。对其进行更深入的研究,并试图更好地理解该工具,出现了两个与配置参数有关的问题:
database.history
什么?MemoryDatabaseHistory
和之间有什么区别FileDatabaseHistory
?谢谢!
我正在尝试debezium-swarm-demo
在github存储库中构建应用程序debezium-examples
构建没有完成,给了我 Execution default of goal org.wildfly.swarm:wildfly-swarm-plugin:2018.5.0:package failed: An API incompatibility was encountered while executing org.wildfly.swarm:wildfly-swarm-plugin:2018.5.0:package: java.lang.AbstractMethodError: null
maven使用-e参数生成的输出如下:
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< com.example:debezium-swarm-demo >-------------------
[INFO] Building WildFly Swarm Example 1.0.0-SNAPSHOT
[INFO] --------------------------------[ war ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ debezium-swarm-demo ---
[INFO] Deleting /home/anushka/SideProjects/Docker/debezium-examples/end-to-end-demo/debezium-swarm-demo/target
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ debezium-swarm-demo ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource …
Run Code Online (Sandbox Code Playgroud) 我暂停了我的 Kafka 连接器,重新启动它们时,我的日志中出现了这些错误
[2020-02-19 19:36:00,219] ERROR WorkerSourceTask{id=wem-postgres-source-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
************
************
[2020-02-19 19:36:00,216] ERROR WorkerSourceTask{id=wem-postgres-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 2389 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Run Code Online (Sandbox Code Playgroud)
随着未完成消息的数量发生变化,我多次收到此错误。然后停了下来,再也没有看到。
我是否需要在此处采取任何措施或 Connect 重试并提交偏移量,这就是错误已停止的原因?
谢谢
这个问题与:Debezium 如何正确注册 SqlServer 连接器与 Kafka Connect - 连接被拒绝
在 Windows 10 中,我让 Debezium 在 Docker 容器外部的 Microsoft SQL Server 实例上运行。我每 390 毫秒收到以下警告:
数据库中没有记录最大 LSN;请确保 SQL Server 代理正在运行
[io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
我在 Github 上检查了 Debezium 的代码,唯一能在代码注释中找到此警告的地方指出,只有在 Agent 未运行时才应抛出此警告。我已确认 SQL Server 代理正在运行。
为什么会出现此警告,我该如何解决?
笔记:
我当前的解决方案似乎只适用于非生产环境 - 根据 Docker 的文档。
我有一个工作 s3 接收器连接器,直到源连接器发送一个 NULL 值;s3 连接器崩溃。当我从 MS SQL db 中删除一条记录时出现问题。源连接器将删除信息发送到 s3 连接器并且 s3 连接器崩溃。我删除并重新创建了一个不同名称的 s3 连接器,没有任何改变。
org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) …
Run Code Online (Sandbox Code Playgroud) 我使用 debezium cdc connect pg,我构建了 docker 使用的 pg 11?pg 运行良好。当我在 kafka 连接器中使用 debezium 时?它报告?
无法获得数据库测试的编码
卷曲是:
curl -H "Accept: application/json" -H "Content-type: application/json" -X POST http://localhost:8083/connectors/ -d '{
"name": "debezium",
"config": {
"name": "debezium",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "5432",
"database.dbname": "test",
"database.user": "pg",
"database.password": "135790",
"database.server.name": "ls",
"table.whitelist": "public.test",
"plugin.name": "pgoutput"
}
}'
Run Code Online (Sandbox Code Playgroud)
卡夫卡例外是:
[2020-07-08 09:24:35,076] ERROR Uncaught exception in REST call to /connectors/ (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.RuntimeException: Couldn't obtain encoding for database test
at io.debezium.connector.postgresql.connection.PostgresConnection.determineDatabaseCharset(PostgresConnection.java:434)
at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:77)
at …
Run Code Online (Sandbox Code Playgroud) 提供了一个用例:
流处理架构;事件进入 Kafka,然后由带有 MongoDB 接收器的作业进行处理。
数据库名称:myWebsite
集合:users
并且该作业会将user
记录放入users
集合中。
users
集合的变化,并且在每次变化时,都会将有关该主题的事件生成到 Kafka 中dbserver1.myWebsite.users
?假设dbserver1
是连接器的名称。dbserver1.myWebsite.users
并对这些事件做出反应吗?我想要某种形式来确认我到目前为止的理解。谢谢你!
我正在尝试在启用 SSL 的 Kafka 集群中注册 MySql Debezium 连接器。我为此目的使用的卷曲是:
curl -k -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/ -d '{ "name": "test-eds-extactor-profile", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" } }'
Run Code Online (Sandbox Code Playgroud)
Debezium 无法创建 database.history 主题,失败并出现以下错误:
{"name":"test-eds-extactor-profile","connector":{"state":"RUNNING","worker_id":"<IPADDRESS>:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to …
Run Code Online (Sandbox Code Playgroud) debezium ×10
apache-kafka ×7
java ×3
mysql ×3
amazon-s3 ×1
maven ×1
mysqlbinlog ×1
offset ×1
postgresql ×1
sql-server ×1