标签: debezium

Kafka Connect JDBC 与 Debezium CDC

JDBC 连接器Debezium SQL Server CDC 连接器有什么区别(或任何其他关系数据库连接器)我应该什么时候选择一个而不是另一个,寻找在两个关系数据库之间同步的解决方案?

不确定这个讨论是否应该是关于 CDC 与 JDBC 连接器,而不是 Debezium SQL Server CDC 连接器,甚至只是 Debezium,期待以后的编辑,取决于给定的答案(虽然我的情况是关于 SQL Server 接收器)。

与您分享我对这个主题的研究,这让我想到了这个问题(作为答案)

sql jdbc cdc apache-kafka-connect debezium

24
推荐指数
2
解决办法
6234
查看次数

使用数据库 CDC 的事件溯源是否被认为是好的架构?

当我们谈论采购事件时,我们有一个简单的双写架构,我们可以写入数据库,然后将事件写入队列,如 Kafka。其他下游系统可以读取这些事件并相应地对它们采取行动/使用它们。

但是当试图使数据库和事件同步时会出现问题,因为需要对这些事件进行排序才能使其有意义。

为了解决这个问题,人们鼓励使用数据库提交日志作为事件源,并且有围绕它构建的工具,例如 Airbnb 的 Spinal Tap、Redhat 的 Debezium、Oracle 的 Golden Gate 等……它解决了一致性、排序保证和所有这些。

但是使用数据库提交日志作为事件源的问题是我们与数据库模式紧密耦合。微服务的数据库架构是公开的,数据库架构中的任何破坏性更改,如数据类型更改或列名更改,实际上都可能破坏下游系统。

那么使用 DB CDC 作为事件源是个好主意吗?

关于这个问题和使用 Debezium 进行事件溯源的讨论

software-design cqrs event-sourcing microservices debezium

12
推荐指数
1
解决办法
2632
查看次数

debezium 无法使用 postgres 11 和默认插件 pgoutput 访问文件“decoderbufs”

我是 kafka 的新手,我正在尝试使用 debezium postgres 连接器。但即使使用带有标准插件的 postgres 11 版,我也会收到此错误:org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory

要运行 kafka / debezium,我使用了 fast-data-dev docker 的图像,如下所示

  # this is our kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:latest
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - …
Run Code Online (Sandbox Code Playgroud)

apache-kafka debezium

12
推荐指数
1
解决办法
2536
查看次数

无法使用Confluent Elasticsearch接收器将Kafka主题数据转换为结构化JSON

我正在使用Kafka构建数据管道.数据流如下:捕获mongodb中的数据更改并将其发送到elasticsearch.

在此输入图像描述

MongoDB的

  • 版本3.6
  • 分片群集

卡夫卡

  • Confuent Platform 4.1.0
  • mongoDB源连接器:debezium 0.7.5
  • elasticserach水槽连接器

Elasticsearch

  • 版本6.1.0

由于我还在测试,Kafka相关系统正在单个服务器上运行.

  • 启动zookeepr

    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动引导服务器

    $ bin/kafka-server-start etc/kafka/server.properties
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动注册表架构

    $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动mongodb源connetor

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone.properties \ 
      etc/kafka/connect-mongo-source.properties
    
    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    
    $ cat etc/schema-registry/connect-avro-standalone.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8083
    
    Run Code Online (Sandbox Code Playgroud)
  • 启动elasticsearch sink连接器

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone2.properties  \ 
      etc/kafka-connect-elasticsearch/elasticsearch.properties
    
    $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
    >>>
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    
    $ cat etc/schema-registry/connect-avro-standalone2.properties
    >>> …
    Run Code Online (Sandbox Code Playgroud)

python mongodb elasticsearch apache-kafka debezium

8
推荐指数
1
解决办法
2071
查看次数

使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium

我在 AWS 中有一个正在运行的 debezium 集群,没有问题。我想尝试一下 AWS MSK。所以我启动了一个集群。然后我启动了一个 EC2 来运行我的连接器。

然后安装 confluent-kafka

sudo apt-get update && sudo apt-get install confluent-platform-2.12
Run Code Online (Sandbox Code Playgroud)

默认情况下,AWS MSK 没有架构注册表,所以我从连接器 EC2 架构注册表 conf 文件配置了它:

kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181


kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
Run Code Online (Sandbox Code Playgroud)

然后/etc/kafka/connect-distributed.properties归档

bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092

plugin.path=/usr/share/java,/usr/share/confluent-hub-components
Run Code Online (Sandbox Code Playgroud)

安装连接器:

confluent-hub install debezium/debezium-connector-mysql:latest
Run Code Online (Sandbox Code Playgroud)

启动服务

systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
Run Code Online (Sandbox Code Playgroud)

现在一切都开始了。然后我创建了一个 mysql.json 文件。

{
    "name": "mysql-connector-db01",
    "config": {
        "name": "mysql-connector-db01",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.server.id": "1",
        "tasks.max": "3",
        "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
        "database.history.kafka.topic": "schema-changes.mysql",
        "database.server.name": "mysql-db01",
        "database.hostname": "172.31.84.129",
        "database.port": "3306",
        "database.user": "bhuvi",
        "database.password": "my_stong_password",
        "database.whitelist": "proddb,test",
        "internal.key.converter.schemas.enable": "false",
        "key.converter.schemas.enable": "false", …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services apache-kafka debezium amazon-msk

7
推荐指数
1
解决办法
3592
查看次数

使用独立模式 Kafka-connect 将数据捕获从 Postgres SQL 更改为 kafka 主题

我一直在尝试使用以下命令 /bin connect-standalone.properties config/connect-standalone.properties postgres.sproperties 从 postgres sql 到 kafka 主题获取数据,但我面临着几个问题,这里是我的 postgres 的内容。属性文件:

name=cdc_demo
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=decoderbufs
slot.name=debezium
slot.drop_on_stop=false
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=XXXXX
database.dbname=snehildb
time.precision.mode=adaptive
database.sslmode=disable
database.server.name=localhost:5432/snehildb
table.whitelist=public.students
decimal.handling.mode=precise
topic.creation.enable=true`
Run Code Online (Sandbox Code Playgroud)

以下是 connect-standalone.properties 的内容:

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every 
Connect user will
# need to configure these based on the format they want …
Run Code Online (Sandbox Code Playgroud)

postgresql change-data-capture apache-kafka apache-kafka-connect debezium

7
推荐指数
1
解决办法
6631
查看次数

Debezium postgres 增量快照性能问题

我正在尝试在最新的 debezium (1.7) 和 postgres (V13) 中使用 debezium增量快照。为了进行测试,我在一个表中填充了 1M 行,每行 4KB,带有一个 UUID 主键和 20 个 varchar 列。由于我只是想测量快照性能,因此表数据在测试期间不会改变

看起来增量快照比常规快照慢一个数量级。例如,在我的测试中,我观察到使用普通快照的速度为每秒 10,000 个更改事件。然而,我观察到增量快照的速度为每秒 500 个更改事件。

我尝试将其增加到incremental.snapshot.chunk.size10,000,但我没有看到对性能有太大影响。

我只是想确认这是否是已知/预期的问题,还是我做错了什么?

谢谢

postgresql change-data-capture debezium

7
推荐指数
1
解决办法
600
查看次数

Postgres Debezium 不发布记录的先前状态

我成功安装了Postgres Debezium CDC。现在,我能够捕获数据库发生的所有更改。但问题是“之前”字段始终为空。因此,如果我插入一条记录,(id = 1, name = Bill)我就会从 Kafka 获取以下数据:

'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)

但如果我像这样更新记录:

UPDATE mytable set name = 'Bob' WHERE id = 1
Run Code Online (Sandbox Code Playgroud)

我从卡夫卡那里得到这个:

'payload': {'before': None, 'after': {'id': 1, 'name': 'Bob'}, ...
Run Code Online (Sandbox Code Playgroud)

这就是我配置连接器的方式:

'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)

这是什么问题?我该如何解决?

postgresql apache-kafka-connect debezium

6
推荐指数
1
解决办法
5193
查看次数

Kafka Producer 无法在没有 PK 的情况下验证记录并返回 InvalidRecordException

我的 kafka 制作人有错误。我使用 Debezium Kafka 连接器 V1.1.0 Final 和 Kafka 2.4.1 。对于带有 pk 的表,所有表都清楚地刷新,但不幸的是,对于没有 pk 的表,它给了我这个错误:

[2020-04-14 10:00:00,096] INFO   Exporting data from table 'public.table_0' (io.debezium.relational.RelationalSnapshotChangeEventSource:280)
[2020-04-14 10:00:00,097] INFO   For table 'public.table_0' using select statement: 'SELECT * FROM "public"."table_0"' (io.debezium.relational.RelationalSnapshotChangeEventSource:287)
[2020-04-14 10:00:00,519] INFO   Finished exporting 296 records for table 'public.table_0'; total duration '00:00:00.421' (io.debezium.relational.RelationalSnapshotChangeEventSource:330)
[2020-04-14 10:00:00,522] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:79)
[2020-04-14 10:00:00,523] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfo=source_info[server='postgres'db='xxx, lsn=38/C74913C0, txId=4511542, timestamp=2020-04-14T02:00:00.517Z, snapshot=FALSE, schema=public, table=table_0], partition={server=postgres}, lastSnapshotRecord=true]] (io.debezium.pipeline.ChangeEventSourceCoordinator:90) …
Run Code Online (Sandbox Code Playgroud)

postgresql apache-kafka apache-kafka-connect debezium

6
推荐指数
1
解决办法
3760
查看次数

Postgres 使用 debezium 创建复制槽失败

我使用 postgres 配置了 debezium,如下代码所示,当我启动 Spring Boot 应用程序时出现错误Creation of replication slot failed

@Bean
public io.debezium.config.Configuration connector() {
    return io.debezium.config.Configuration.create()
            .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
            .with(EmbeddedEngine.OFFSET_STORAGE,  FileOffsetBackingStore.class)
            .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "offset.dat")
            .with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 60000)
            .with("name", "test-postgres-connector")
            .with(RelationalDatabaseConnectorConfig.SERVER_NAME, "172.26.113.123:5455-test")
            .with(RelationalDatabaseConnectorConfig.HOSTNAME, "172.26.113.123")
            .with(RelationalDatabaseConnectorConfig.PORT, "5455")
            .with(RelationalDatabaseConnectorConfig.USER, "test")
            .with(RelationalDatabaseConnectorConfig.PASSWORD, "test")
            .with(RelationalDatabaseConnectorConfig.DATABASE_NAME, "test")
            .with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, "question").build();
}


2021-04-02 12:58:56.817 ERROR 14672 --- [pool-5-thread-1] io.debezium.embedded.EmbeddedEngine      : Unable to initialize and start connector's task class 'io.debezium.connector.postgresql.PostgresConnectorTask' with config: {name=student-postgres-connector, connector.class=io.debezium.connector.postgresql.PostgresConnector, database.port=5455, offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore, table.include.list=question, database.user=pfe, database.hostname=172.26.113.123, offset.storage.file.filename=offset.dat, database.password=********, offset.flush.interval.ms=60000, database.server.name=172.26.113.123:5455-pfe, database.dbname=pfe}

io.debezium.DebeziumException: Creation of replication slot …
Run Code Online (Sandbox Code Playgroud)

postgresql apache-kafka-connect debezium

6
推荐指数
1
解决办法
8447
查看次数