我正在尝试使用Python解释Debezium在Kafka 中存储的Avro记录
{
"name": "id",
"type": {
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
}
Run Code Online (Sandbox Code Playgroud)
我不确定这对应于哪种 Python 3 原始类型。如何反序列化这个值?
提前致谢!
我正在使用 Debezium (0.7.5) MySQL 连接器,如果我想使用选项更新此配置,我试图了解什么是最佳方法table.whitelist
。
假设我创建了一个连接器,如下所示:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
"name": "MyConnector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"connect.timeout.ms": "60000",
"tasks.max": "1",
"database.hostname": "myhost",
"database.port": "3306",
"database.user": "***",
"database.password": "***",
"database.server.id": "3227197",
"database.server.name": "MyServer",
"database.whitelist": "myDb",
"table.whitelist": "myDb.table1,myDb.table2",
"database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
"database.history.kafka.topic": "MyConnectorHistoryTopic",
"max.batch.size": "1024",
"snapshot.mode": "initial",
"decimal.handling.mode": "double"
}
}'
Run Code Online (Sandbox Code Playgroud)
一段时间后(2 周),我需要向myDb.table3
此table.whitelist
选项添加一个新表 ( ) (该表是旧表,它是在连接器之前创建的)
我尝试的是:
通过 API 更新命令:
curl -i -X …
Run Code Online (Sandbox Code Playgroud) 我们一直在尝试在 AWS Linux 机器上建立一个生产级的 Kafka 集群,直到现在我们都没有成功。
卡夫卡版本:2.1.0
机器:
5 r5.xlarge machines for 5 Kafka brokers.
3 t2.medium zookeeper nodes
1 t2.medium node for schema-registry and related tools. (a Single instance of each)
1 m5.xlarge machine for Debezium.
Run Code Online (Sandbox Code Playgroud)
默认代理配置:
num.partitions=15
min.insync.replicas=1
group.max.session.timeout.ms=2000000
log.cleanup.policy=compact
default.replication.factor=3
zookeeper.session.timeout.ms=30000
Run Code Online (Sandbox Code Playgroud)
我们的问题主要与海量数据有关。我们正在尝试使用 debezium 传输 kafka 主题中的现有表。这些表中的许多都非常庞大,有超过 50000000 行。
到现在为止,我们已经尝试了很多方法,但是我们的集群每次都会因为一个或多个原因而失败。
错误在计划任务“isr-expiration”(kafka.utils.KafkaScheduler)org.apache.zookeeper.KeeperException$SessionExpiredException 中未捕获异常:KeeperErrorCode = Session 在 org.apache 的 /brokers/topics/__consumer_offsets/partitions/0/state 已过期。 zookeeper.KeeperException.create(KeeperException.java:130) 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:54)..
错误 2:
] INFO [Partition xxx.public.driver_operation-14 broker=3] Cached zkVersion [21] 不等于zookeeper,跳过更新ISR (kafka.cluster.Partition) [2018-12-12 14:07:26,551] INFO [分区 …
我正在使用 debezium SQL Server 来跟踪生产基础上的变化。主题已创建,CDC 的工作非常出色,但是在尝试使用 jdbcSinkConnector 将数据转储到另一个 Sql Server DB 时,我遇到了以下错误。
com.microsoft.sqlserver.jdbc.SQLServerException: One or more values is out of range of values for the datetime2 SQL Server data type
在源数据库上,sql 数据类型是timestamp2(7)
. kafka 事件是 1549461754650000000。模式类型是 INT64。架构名称 io.debezium.time.NanoTimestamp。
我找不到一种方法来告诉 TimestampConverter 值不是以毫秒或微秒表示,而是以纳秒表示(无论如何都无法使用微秒)。
这是我的连接器配置
{
"name": "cdc.swip.bi.ods.sink.contract",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "swip.swip_core.contract",
"connection.url": "jdbc:sqlserver://someip:1234;database=DB",
"connection.user": "loloolololo",
"connection.password": "muahahahahaha",
"dialect.name": "SqlServerDatabaseDialect",
"auto.create": "false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "unwrap,created_date,modified_date",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Debezium Postgres 连接器连接到 Cloud SQL postgres。我在日志中收到以下错误。
connect_1 | org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: FATAL: must be superuser or replication role to start walsender
connect_1 | at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:127)
Run Code Online (Sandbox Code Playgroud)
我突然想到我需要将 REPLICATION 添加到我的用户角色中。但是,CloudSQL 文档说明如下。
当您创建新的 Cloud SQL for PostgreSQL 实例时,默认的 postgres 用户已为您创建,但您必须设置其密码。postgres 用户是 cloudsqlsuperuser 角色的一部分,具有以下属性(权限):CREATEROLE、CREATEDB 和 LOGIN。它没有 SUPERUSER 或 REPLICATION 属性。
现在正因为如此,我不能 ALTER ROLE 添加 REPLICATION
ps:云SQL实例配置高可用。
postgresql google-cloud-sql apache-kafka google-cloud-platform debezium
我一直试图找到一种方法,在 debezium 源 sql 连接器中包含一个过滤器,该过滤器丢弃具有特定值的记录。比方说,我有 JSON: {"id":0, "name":"muj"} 并且我想丢弃所有字段“name”值为“muj”的记录。有没有办法在不必使用融合平台的情况下做到这一点?
在我们的应用程序中,我们需要将某个 postgresql 表中的事件发布到 Kafka 中,因此我们决定使用 Debezium,但遇到了以下问题:在初始快照期间,消息以意外的(从我们的角度来看)顺序显示在 Kafka 中。事件的顺序对于我们的应用程序至关重要,实际上它们必须按表的整数主键排序。AFAIK 初始快照只是表中的 SELECT,没有 ORDER BY。那么有没有一种方法或解决方法可以使Debezuim postgresql 连接器按特定顺序提取事件?
提前致谢!
我在终端上使用curl 在 docker 上创建了一个 debezium 连接器,但我一直在修改现有连接器。
我的泊坞窗文件:
---
version: '3'
services:
kafka-connect-02:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect-02
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: …
Run Code Online (Sandbox Code Playgroud) 我计划使用Debezium来制作活动,Kafka
并且我需要强有力的交付和订购保证。通过在生产者配置中使用enable.idempotence=true
参数,我可以获得这些保证。
我的问题是:
我遵循 Debezium 教程(https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres),所有从 Postgres 收到的 CDC 数据都以 JSON 格式发送到 Kafka 主题,并带有 schema -如何获取摆脱架构?
这是连接器的配置(在 Docker 容器中启动)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory"
}
}
Run Code Online (Sandbox Code Playgroud)
JSON 架构仍在消息中。仅当使用以下环境变量启动 Docker 容器时,我才设法摆脱它:
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
Run Code Online (Sandbox Code Playgroud)
为什么我无法通过连接器配置实现完全相同的效果?
具有架构的 Kafka 消息示例:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611918971029,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":602,"lsn":34078720,"xmin":null},"op":"r","ts_ms":1611918971032,"transaction":null}}
Run Code Online (Sandbox Code Playgroud)
示例(我想要的)没有模式:
{"id":1001} {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"dbserver1","ts_ms":1611920304594,"snapshot":"true","db":"postgres","schema":"inventory","table":"customers","txId":597,"lsn":33809448,"xmin":null},"op":"r","ts_ms":1611920304596,"transaction":null}
Run Code Online (Sandbox Code Playgroud)
Debezium 容器使用以下命令运行:
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e …
Run Code Online (Sandbox Code Playgroud) debezium ×10
apache-kafka ×9
postgresql ×3
json ×1
jsonschema ×1
mysql ×1
python ×1
sql-server ×1