如何将 Kafka Connect 适配器与 Amazon MSK 配合使用?根据 AWS 文档,它支持 Kafka 连接,但没有记录如何设置适配器和使用它。
我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在 Type 的每条消息之后,A
我立即收到一个 type B
... 并最终收到一个type C
:
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Run Code Online (Sandbox Code Playgroud)
是什么导致代理节点接受来自有希望的生产者的 TCP 连接,然后立即再次关闭它?
编辑
该主题已存在,并kafka-topics.sh --list
显示它。
我用过的所有客户端都遇到同样的问题:Kafka's kafka-console-producer.sh
、kafka-python、confluent-kafka和kafkacat
Kafka 集群与我的所有其他机器在同一个 VPC 中,它的安全组允许该 VPC 内的任何传入和传出流量。
但是,它由 Amazon 的 Managed Streaming …
python amazon-web-services apache-kafka kafka-producer-api amazon-msk
我在 AWS 中有一个正在运行的 debezium 集群,没有问题。我想尝试一下 AWS MSK。所以我启动了一个集群。然后我启动了一个 EC2 来运行我的连接器。
然后安装 confluent-kafka
sudo apt-get update && sudo apt-get install confluent-platform-2.12
Run Code Online (Sandbox Code Playgroud)
默认情况下,AWS MSK 没有架构注册表,所以我从连接器 EC2 架构注册表 conf 文件配置了它:
kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181
kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
Run Code Online (Sandbox Code Playgroud)
然后/etc/kafka/connect-distributed.properties
归档
bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
Run Code Online (Sandbox Code Playgroud)
confluent-hub install debezium/debezium-connector-mysql:latest
Run Code Online (Sandbox Code Playgroud)
systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
Run Code Online (Sandbox Code Playgroud)
现在一切都开始了。然后我创建了一个 mysql.json 文件。
{
"name": "mysql-connector-db01",
"config": {
"name": "mysql-connector-db01",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.id": "1",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
"database.history.kafka.topic": "schema-changes.mysql",
"database.server.name": "mysql-db01",
"database.hostname": "172.31.84.129",
"database.port": "3306",
"database.user": "bhuvi",
"database.password": "my_stong_password",
"database.whitelist": "proddb,test",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false", …
Run Code Online (Sandbox Code Playgroud)