标签: debezium

Debezium 是否可以配置 table_name => kafka 主题映射?

我已阅读http://debezium.io/docs/connectors/mysql/,但我找不到任何有关是否可以配置debezium的信息,以便可以将2个(或更多)表中的更改写入相同的单个kafka话题?在我看来,它总是 1 个表 -> 1 个主题。

mysql apache-kafka apache-kafka-connect debezium

3
推荐指数
1
解决办法
4670
查看次数

Debezium刷新超时和MySQL出现OutOfMemoryError错误

使用Debezium 0.7读取MySQL,但在初始快照阶段出现刷新超时和OutOfMemoryError错误。查看下面的日志,似乎连接器试图一次写太多消息:

WorkerSourceTask{id=accounts-connector-0} flushing 143706 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
WorkerSourceTask{id=accounts-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
Exception in thread "RMI TCP Connection(idle)" java.lang.OutOfMemoryError: Java heap space
WorkerSourceTask{id=accounts-connector-0} Failed to flush, timed out while waiting for producer to flush outstanding 143706 messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
Run Code Online (Sandbox Code Playgroud)

对于大型数据库(> 50GB),想知道什么是正确的设置http://debezium.io/docs/connectors/mysql/#connector-properties。对于较小的数据库,我没有这个问题。简单地增加超时时间似乎不是一个好的策略。我目前正在使用默认的连接器设置。

更新资料

按照以下建议更改设置,并解决了问题:

OFFSET_FLUSH_TIMEOUT_MS: 60000  # default 5000
OFFSET_FLUSH_INTERVAL_MS: 15000  # default 60000
MAX_BATCH_SIZE: 32768  # default 2048
MAX_QUEUE_SIZE: 131072  # default 8192
HEAP_OPTS: '-Xms2g -Xmx2g'  # default '-Xms1g -Xmx1g'
Run Code Online (Sandbox Code Playgroud)

java mysql apache-kafka apache-kafka-connect debezium

3
推荐指数
1
解决办法
845
查看次数

Debezium上的参数database.history的确切含义是什么?

我们正在使用Debezium从数据库中读取二进制日志到消息代理的流数据中的项目。对其进行更深入的研究,并试图更好地理解该工具,出现了两个与配置参数有关的问题:

  • 参数的确切含义是database.history什么?
  • 可能的值MemoryDatabaseHistory和之间有什么区别FileDatabaseHistory

谢谢!

mysql mysqlbinlog debezium

3
推荐指数
1
解决办法
81
查看次数

执行目标wildfly swarm插件时出现问题,API不兼容,java.lang.AbstractMethodError

我正在尝试debezium-swarm-demogithub存储库中构建应用程序debezium-examples

构建没有完成,给了我 Execution default of goal org.wildfly.swarm:wildfly-swarm-plugin:2018.5.0:package failed: An API incompatibility was encountered while executing org.wildfly.swarm:wildfly-swarm-plugin:2018.5.0:package: java.lang.AbstractMethodError: null

maven使用-e参数生成的输出如下:

[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------< com.example:debezium-swarm-demo >-------------------
[INFO] Building WildFly Swarm Example 1.0.0-SNAPSHOT
[INFO] --------------------------------[ war ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ debezium-swarm-demo ---
[INFO] Deleting /home/anushka/SideProjects/Docker/debezium-examples/end-to-end-demo/debezium-swarm-demo/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ debezium-swarm-demo ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource …
Run Code Online (Sandbox Code Playgroud)

java maven microservices wildfly-swarm debezium

3
推荐指数
1
解决办法
803
查看次数

Kafka Connect - 无法提交偏移量和刷新

我暂停了我的 Kafka 连接器,重新启动它们时,我的日志中出现了这些错误

[2020-02-19 19:36:00,219] ERROR WorkerSourceTask{id=wem-postgres-source-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
************
************
[2020-02-19 19:36:00,216] ERROR WorkerSourceTask{id=wem-postgres-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 2389 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Run Code Online (Sandbox Code Playgroud)

随着未完成消息的数量发生变化,我多次收到此错误。然后停了下来,再也没有看到。

我是否需要在此处采取任何措施或 Connect 重试并提交偏移量,这就是错误已停止的原因?

谢谢

apache-kafka apache-kafka-connect debezium

3
推荐指数
1
解决办法
3823
查看次数

Debezium:数据库中没有记录最大LSN;请确保 SQL Server 代理正在运行

这个问题与:Debezium 如何正确注册 SqlServer 连接器与 Kafka Connect - 连接被拒绝

在 Windows 10 中,我让 Debezium 在 Docker 容器外部的 Microsoft SQL Server 实例上运行。我每 390 毫秒收到以下警告:

数据库中没有记录最大 LSN;请确保 SQL Server 代理正在运行
[io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]

我在 Github 上检查了 Debezium 的代码,唯一能在代码注释中找到此警告的地方指出,只有在 Agent 未运行时才应抛出此警告。我已确认 SQL Server 代理正在运行。

为什么会出现此警告,我该如何解决?

笔记:

我当前的解决方案似乎只适用于非生产环境 - 根据 Docker 的文档。

sql-server apache-kafka apache-kafka-connect debezium

3
推荐指数
1
解决办法
1937
查看次数

kafka s3 接收器连接器在获取 NULL 数据时崩溃

我有一个工作 s3 接收器连接器,直到源连接器发送一个 NULL 值;s3 连接器崩溃。当我从 MS SQL db 中删除一条记录时出现问题。源连接器将删除信息发送到 s3 连接器并且 s3 连接器崩溃。我删除并重新创建了一个不同名称的 s3 连接器,没有任何改变。

    org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 offset apache-kafka apache-kafka-connect debezium

3
推荐指数
1
解决办法
686
查看次数

使用 debezium 链接 postgresql 11 无法获得数据库测试的编码

我使用 debezium cdc connect pg,我构建了 docker 使用的 pg 11?pg 运行良好。当我在 kafka 连接器中使用 debezium 时?它报告?

无法获得数据库测试的编码

卷曲是:

curl -H "Accept: application/json" -H "Content-type: application/json" -X POST http://localhost:8083/connectors/ -d '{
    "name": "debezium",
    "config": {
        "name": "debezium",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.dbname": "test",
        "database.user": "pg",
        "database.password": "135790",
        "database.server.name": "ls",
        "table.whitelist": "public.test",
        "plugin.name": "pgoutput"
    }
}'
Run Code Online (Sandbox Code Playgroud)

卡夫卡例外是:

[2020-07-08 09:24:35,076] ERROR Uncaught exception in REST call to /connectors/ (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.RuntimeException: Couldn't obtain encoding for database test
    at io.debezium.connector.postgresql.connection.PostgresConnection.determineDatabaseCharset(PostgresConnection.java:434)
    at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:77)
    at …
Run Code Online (Sandbox Code Playgroud)

postgresql apache-kafka apache-kafka-connect debezium

3
推荐指数
1
解决办法
947
查看次数

了解 Debezium

提供了一个用例:

流处理架构;事件进入 Kafka,然后由带有 MongoDB 接收器的作业进行处理。

数据库名称:myWebsite 集合:users

并且该作业会将user记录放入users集合中。

  1. 那么 Debezium 将监视users集合的变化,并且在每次变化时,都会将有关该主题的事件生成到 Kafka 中dbserver1.myWebsite.users?假设dbserver1是连接器的名称。
  2. 如果是这样,那么我可以有一个 Kafka 消费者从主题中消费dbserver1.myWebsite.users并对这些事件做出反应吗?
  3. 据我了解,Debezium 产生的事件也包含数据库记录的值?如果它的更改包含旧/新值?如果创建了一条数据库记录,旧的记录是否为空?

我想要某种形式来确认我到目前为止的理解。谢谢你!

debezium

2
推荐指数
1
解决办法
1171
查看次数

无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

我正在尝试在启用 SSL 的 Kafka 集群中注册 MySql Debezium 连接器。我为此目的使用的卷曲是:

curl -k -X POST -H "Accept:application/json"  -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/  -d '{ "name": "test-eds-extactor-profile", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" } }'
Run Code Online (Sandbox Code Playgroud)

Debezium 无法创建 database.history 主题,失败并出现以下错误:

{"name":"test-eds-extactor-profile","connector":{"state":"RUNNING","worker_id":"<IPADDRESS>:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-connect debezium

2
推荐指数
1
解决办法
5931
查看次数