标签: amazon-msk

Kafka Connect 与 Amazon MSK

如何将 Kafka Connect 适配器与 Amazon MSK 配合使用?根据 AWS 文档,它支持 Kafka 连接,但没有记录如何设置适配器和使用它。

apache-kafka apache-kafka-connect amazon-msk

10
推荐指数
2
解决办法
6930
查看次数

可以与 Zookeeper 交谈,但不能与消息代理交谈

我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在 Type 的每条消息之后,A我立即收到一个 type B... 并最终收到一个type C

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Run Code Online (Sandbox Code Playgroud)

是什么导致代理节点接受来自有希望的生产者的 TCP 连接,然后立即再次关闭它?

编辑

  • 该主题已存在,并kafka-topics.sh --list显示它。

  • 我用过的所有客户端都遇到同样的问题:Kafka's kafka-console-producer.shkafka-pythonconfluent-kafkakafkacat

  • Kafka 集群与我的所有其他机器在同一个 VPC 中,它的安全组允许该 VPC 内的任何传入和传出流量。

  • 但是,它由 Amazon 的 Managed Streaming …

python amazon-web-services apache-kafka kafka-producer-api amazon-msk

7
推荐指数
1
解决办法
1620
查看次数

使用 AWS MSK NOT_ENOUGH_REPLICAS 的 Debezium

我在 AWS 中有一个正在运行的 debezium 集群,没有问题。我想尝试一下 AWS MSK。所以我启动了一个集群。然后我启动了一个 EC2 来运行我的连接器。

然后安装 confluent-kafka

sudo apt-get update && sudo apt-get install confluent-platform-2.12
Run Code Online (Sandbox Code Playgroud)

默认情况下,AWS MSK 没有架构注册表,所以我从连接器 EC2 架构注册表 conf 文件配置了它:

kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181


kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
Run Code Online (Sandbox Code Playgroud)

然后/etc/kafka/connect-distributed.properties归档

bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092

plugin.path=/usr/share/java,/usr/share/confluent-hub-components
Run Code Online (Sandbox Code Playgroud)

安装连接器:

confluent-hub install debezium/debezium-connector-mysql:latest
Run Code Online (Sandbox Code Playgroud)

启动服务

systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
Run Code Online (Sandbox Code Playgroud)

现在一切都开始了。然后我创建了一个 mysql.json 文件。

{
    "name": "mysql-connector-db01",
    "config": {
        "name": "mysql-connector-db01",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.server.id": "1",
        "tasks.max": "3",
        "database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
        "database.history.kafka.topic": "schema-changes.mysql",
        "database.server.name": "mysql-db01",
        "database.hostname": "172.31.84.129",
        "database.port": "3306",
        "database.user": "bhuvi",
        "database.password": "my_stong_password",
        "database.whitelist": "proddb,test",
        "internal.key.converter.schemas.enable": "false",
        "key.converter.schemas.enable": "false", …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services apache-kafka debezium amazon-msk

7
推荐指数
1
解决办法
3592
查看次数