标签: debezium

如何将存储在 avro 文件中的 org.apache.kafka.connect.data.Decimal 转换为 python 类型?

我正在尝试使用Python解释DebeziumKafka 中存储的Avro记录

           {
              "name": "id",
              "type": {
                "type": "bytes",
                "scale": 0,
                "precision": 64,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "0"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }
            }
Run Code Online (Sandbox Code Playgroud)

我不确定这对应于哪种 Python 3 原始类型。如何反序列化这个值?

提前致谢!

python apache-kafka apache-kafka-connect debezium

5
推荐指数
1
解决办法
1502
查看次数

使用表白名单选项更新 Debezium MySQL 连接器

我正在使用 Debezium (0.7.5) MySQL 连接器,如果我想使用选项更新此配置,我试图了解什么是最佳方法table.whitelist

假设我创建了一个连接器,如下所示:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'
Run Code Online (Sandbox Code Playgroud)

一段时间后(2 周),我需要向myDb.table3table.whitelist选项添加一个新表 ( ) (该表是旧表,它是在连接器之前创建的)

我尝试的是:

  • 暂停连接器。
  • 删除了历史主题(也许这就是问题所在?)。
  • 通过 API 更新配置端点更新了配置。
  • 恢复连接器。

通过 API 更新命令:

curl -i -X …
Run Code Online (Sandbox Code Playgroud)

mysql apache-kafka apache-kafka-connect debezium

5
推荐指数
1
解决办法
5050
查看次数

Apache kafka 生产集群设置问题

我们一直在尝试在 AWS Linux 机器上建立一个生产级的 Kafka 集群,直到现在我们都没有成功。

卡夫卡版本:2.1.0

机器:

5 r5.xlarge machines for 5 Kafka brokers.
3 t2.medium zookeeper nodes
1 t2.medium node for schema-registry and related tools. (a Single instance of each)
1 m5.xlarge machine for Debezium.
Run Code Online (Sandbox Code Playgroud)

默认代理配置:

num.partitions=15
min.insync.replicas=1
group.max.session.timeout.ms=2000000 
log.cleanup.policy=compact
default.replication.factor=3
zookeeper.session.timeout.ms=30000
Run Code Online (Sandbox Code Playgroud)

我们的问题主要与海量数据有关。我们正在尝试使用 debezium 传输 kafka 主题中的现有表。这些表中的许多都非常庞大,有超过 50000000 行。

到现在为止,我们已经尝试了很多方法,但是我们的集群每次都会因为一个或多个原因而失败。

错误在计划任务“isr-expiration”(kafka.utils.KafkaScheduler)org.apache.zookeeper.KeeperException$SessionExpiredException 中未捕获异常:KeeperErrorCode = Session 在 org.apache 的 /brokers/topics/__consumer_offsets/partitions/0/state 已过期。 zookeeper.KeeperException.create(KeeperException.java:130) 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:54)..

错误 2:

] INFO [Partition xxx.public.driver_operation-14 broker=3] Cached zkVersion [21] 不等于zookeeper,跳过更新ISR (kafka.cluster.Partition) [2018-12-12 14:07:26,551] INFO [分区 …

apache-kafka apache-kafka-connect debezium

5
推荐指数
1
解决办法
3339
查看次数

debezium 生成事件的 Kafka Connect 日期处理

我正在使用 debezium SQL Server 来跟踪生产基础上的变化。主题已创建,CDC 的工作非常出色,但是在尝试使用 jdbcSinkConnector 将数据转储到另一个 Sql Server DB 时,我遇到了以下错误。

com.microsoft.sqlserver.jdbc.SQLServerException: One or more values is out of range of values for the datetime2 SQL Server data type

在源数据库上,sql 数据类型是timestamp2(7). kafka 事件是 1549461754650000000。模式类型是 INT64。架构名称 io.debezium.time.NanoTimestamp。

我找不到一种方法来告诉 TimestampConverter 值不是以毫秒或微秒表示,而是以纳秒表示(无论如何都无法使用微秒)。

这是我的连接器配置

{
    "name": "cdc.swip.bi.ods.sink.contract",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "swip.swip_core.contract",
        "connection.url": "jdbc:sqlserver://someip:1234;database=DB",
        "connection.user": "loloolololo",
        "connection.password": "muahahahahaha",
        "dialect.name": "SqlServerDatabaseDialect",
        "auto.create": "false",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schemas.enable": "true",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "transforms": "unwrap,created_date,modified_date",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect debezium

5
推荐指数
1
解决办法
1570
查看次数

Kafka 连接 Debezium Postgres Cloud SQL

我正在尝试使用 Debezium Postgres 连接器连接到 Cloud SQL postgres。我在日志中收到以下错误。

connect_1    | org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: FATAL: must be superuser or replication role to start walsender
connect_1    |  at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:127)
Run Code Online (Sandbox Code Playgroud)

我突然想到我需要将 REPLICATION 添加到我的用户角色中。但是,CloudSQL 文档说明如下。

当您创建新的 Cloud SQL for PostgreSQL 实例时,默认的 postgres 用户已为您创建,但您必须设置其密码。postgres 用户是 cloudsqlsuperuser 角色的一部分,具有以下属性(权限):CREATEROLE、CREATEDB 和 LOGIN。它没有 SUPERUSER 或 REPLICATION 属性。

现在正因为如此,我不能 ALTER ROLE 添加 REPLICATION

ps:云SQL实例配置高可用。

postgresql google-cloud-sql apache-kafka google-cloud-platform debezium

5
推荐指数
1
解决办法
1908
查看次数

在kafka connect中按某个值过滤记录

我一直试图找到一种方法,在 debezium 源 sql 连接器中包含一个过滤器,该过滤器丢弃具有特定值的记录。比方说,我有 JSON: {"id":0, "name":"muj"} 并且我想丢弃所有字段“name”值为“muj”的记录。有没有办法在不必使用融合平台的情况下做到这一点?

sql-server json apache-kafka apache-kafka-connect debezium

5
推荐指数
0
解决办法
1617
查看次数

Debezium 中初始快照事件的顺序

在我们的应用程序中,我们需要将某个 postgresql 表中的事件发布到 Kafka 中,因此我们决定使用 Debezium,但遇到了以下问题:在初始快照期间,消息以意外的(从我们的角度来看)顺序显示在 Kafka 中。事件的顺序对于我们的应用程序至关重要,实际上它们必须按表的整数主键排序。AFAIK 初始快照只是表中的 SELECT,没有 ORDER BY。那么有没有一种方法或解决方法可以使Debezuim postgresql 连接器按特定顺序提取事件?

提前致谢!

postgresql debezium

4
推荐指数
1
解决办法
1972
查看次数

如何修改kafka连接器?

我在终端上使用curl 在 docker 上创建了一个 debezium 连接器,但我一直在修改现有连接器。

我的泊坞窗文件:

---
version: '3'
services:

  kafka-connect-02:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect-02
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect-group-01-v04
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
      CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
      # Confluent Cloud config
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect debezium

4
推荐指数
1
解决办法
4786
查看次数

Debezium 是否提供交货和订购保证?

我计划使用来制作活动,Kafka并且我需要强有力的交付和订购保证。通过在生产者配置中使用enable.idempotence=true参数,我可以获得这些保证。

我的问题是:

  1. 我怎样才能用 Debezium 实现这一目标?
  2. 这些保证是默认提供的吗?
  3. 可以配置吗?如何?

注意:我计划使用Debezium Connector for PostgreSQL

postgresql apache-kafka apache-kafka-connect debezium

4
推荐指数
1
解决办法
3006
查看次数

如何在 Kafka Source Connector(例如 Debezium)中禁用 JSON 模式

我遵循 Debezium 教程(https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres),所有从 Postgres 收到的 CDC 数据都以 JSON 格式发送到 Kafka 主题,并带有 schema -如何获取摆脱架构?

这是连接器的配置(在 Docker 容器中启动)

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include": "inventory"
    }
}
Run Code Online (Sandbox Code Playgroud)

JSON 架构仍在消息中。仅当使用以下环境变量启动 Docker 容器时,我才设法摆脱它:

 - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
 - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
Run Code Online (Sandbox Code Playgroud)

为什么我无法通过连接器配置实现完全相同的效果?

具有架构的 Kafka 消息示例:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611918971029,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":602,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1611918971032,"transaction":null}}
Run Code Online (Sandbox Code Playgroud)

示例(我想要的)没有模式:

{"id":1001} {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611920304594,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":597,"lsn":33809448,"xmin":null},"op":"r","ts_ms":1611920304596,"transaction":null}
Run Code Online (Sandbox Code Playgroud)

Debezium 容器使用以下命令运行:

docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e …
Run Code Online (Sandbox Code Playgroud)

jsonschema apache-kafka apache-kafka-connect debezium

4
推荐指数
1
解决办法
5646
查看次数