Jac*_*ian 2 postgresql apache-kafka apache-kafka-connect debezium
我当前的测试配置如下所示:
version: '3.7'
services:
postgres:
image: debezium/postgres
restart: always
ports:
- "5432:5432"
zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka
restart: always
ports:
- "9092:9092"
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=250
connect:
image: debezium/connect
restart: always
ports:
- "8083:8083"
links:
- zookeeper
- postgres
- kafka
depends_on:
- zookeeper
- postgres
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
Run Code Online (Sandbox Code Playgroud)
我像这样使用 docker-compose 运行它:
$ docker-compose up
Run Code Online (Sandbox Code Playgroud)
而且我看不到错误消息。似乎一切正常。如果我这样做docker ps,我会看到所有服务都在运行。
为了检查 Kafka 是否正在运行,我在 Python 中创建了 Kafka 生产者和 Kafka 消费者:
# producer. I run it in one console window
from kafka import KafkaProducer
from json import dumps
from time import sleep
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('numtest', value=data)
sleep(5)
# consumer. I run it in other colsole window
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message)
Run Code Online (Sandbox Code Playgroud)
它的效果非常好。我看到我的生产者如何发布消息,我看到它们在消费者窗口中是如何被消费的。
现在我想让 CDC 工作。首先,在 Postgres 容器内,我将postgres角色密码设置为postgres:
$ su postgres
$ psql
psql> \password postgres
Enter new password: postgres
Run Code Online (Sandbox Code Playgroud)
然后我创建了一个新数据库test:
psql> CREATE DATABASE test;
Run Code Online (Sandbox Code Playgroud)
我创建了一个表:
psql> \c test;
test=# create table mytable (id serial, name varchar(128), primary key(id));
Run Code Online (Sandbox Code Playgroud)
最后,对于我的 Debezium CDC 堆栈,我创建了一个连接器:
$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "postgres",
"database.whitelist": "public.mytable",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "public.some_topic"
}
}'
{"name":"test-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"test","database.server.name":"postgres","database.whitelist":"public.mytable","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"public.some_topic","name":"test-connector"},"tasks":[],"type":"source"}
Run Code Online (Sandbox Code Playgroud)
如您所见,我的连接器创建时没有任何错误。现在我希望 Debezium CDC 发布对 Kafka topic 的所有更改public.some_topic。为了检查这一点,我创建了一个新的 Kafka 消费者:
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'public.some_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message)
Run Code Online (Sandbox Code Playgroud)
与第一个示例的唯一区别是我正在观看public.some_topic. 然后我转到数据库控制台并进行插入:
test=# insert into mytable (name) values ('Tom Cat');
INSERT 0 1
test=#
Run Code Online (Sandbox Code Playgroud)
因此,插入了一个新值,但我看到消费者窗口中没有发生任何事情。换句话说,Debezium 不会向 Kafka 发布事件public.some_topic。这有什么问题,我该如何解决?
创建连接器时,使用 Docker Compose 在 Kafka Connect 工作日志中看到此错误:
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "pgoutput": No such file or directory
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)
... 9 more
Run Code Online (Sandbox Code Playgroud)
如果您使用 Kafka Connect REST API 查询它,这也会反映在任务的状态中:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '."test-connector".status'
{
"name": "test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.16.5:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.16.5:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:129)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)\n\tat org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)\n\t... 9 more\n"
}
],
"type": "source"
Run Code Online (Sandbox Code Playgroud)
您正在运行的 Postgres 版本是
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "pgoutput": No such file or directory
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)
... 9 more
Run Code Online (Sandbox Code Playgroud)
在pgoutput仅适用> = 10版。
我将您的 Docker Compose 更改为使用版本 10:
image: debezium/postgres:10
Run Code Online (Sandbox Code Playgroud)
在弹跳堆栈以重新开始并按照您的说明进行操作后,我得到了一个正在运行的连接器:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '."test-connector".status'
{
"name": "test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.16.5:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.16.5:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:129)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: ERROR: could not access file \"pgoutput\": No such file or directory\n\tat org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2505)\n\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2241)\n\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)\n\tat org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)\n\tat org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)\n\tat org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)\n\tat org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:288)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126)\n\t... 9 more\n"
}
],
"type": "source"
Run Code Online (Sandbox Code Playgroud)
和 Kafka 主题中的数据:
$ docker exec kafkacat kafkacat -b kafka:9092 -t postgres.public.mytable -C
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"postgres.public.mytable.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"postgres.public.mytable.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"Tom Cat"},"source":{"version":"1.0.0.Final","connector":"postgresql","name":"postgres","ts_ms":1579172192292,"snapshot":"false","db":"test","schema":"public","table":"mytable","txId":561,"lsn":24485520,"xmin":null},"op":"c","ts_ms":1579172192347}}% Reached end of topic postgres.public.mytable [0] at offset 1
Run Code Online (Sandbox Code Playgroud)
我将 kafkacat 添加到您的 Docker Compose 中:
kafkacat:
image: edenhill/kafkacat:1.5.0
container_name: kafkacat
entrypoint:
- /bin/sh
- -c
- |
while [ 1 -eq 1 ];do sleep 60;done
Run Code Online (Sandbox Code Playgroud)
编辑:保留以前的答案,因为它仍然有用且相关:
Debezium 将根据表的名称将消息写入主题。在您的示例中,这将是postgres.test.mytable.
这就是kafkacat有用的原因,因为您可以运行
kafkacat -b broker:9092 -L
Run Code Online (Sandbox Code Playgroud)
查看所有主题和分区的列表。一旦你有了话题
kafkacat -b broker:9092 -t postgres.test.mytable -C
Run Code Online (Sandbox Code Playgroud)
从中阅读。
查看有关kafkacat 的详细信息,包括如何使用 Docker 运行它
这里还有一个使用 Docker Compose 运行的演示
| 归档时间: |
|
| 查看次数: |
1774 次 |
| 最近记录: |