可以与 Zookeeper 交谈,但不能与消息代理交谈

Kil*_*oth 7 python amazon-web-services apache-kafka kafka-producer-api amazon-msk

我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在 Type 的每条消息之后,A我立即收到一个 type B... 并最终收到一个type C

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Run Code Online (Sandbox Code Playgroud)

是什么导致代理节点接受来自有希望的生产者的 TCP 连接,然后立即再次关闭它?

编辑

  • 该主题已存在,并kafka-topics.sh --list显示它。

  • 我用过的所有客户端都遇到同样的问题:Kafka's kafka-console-producer.shkafka-pythonconfluent-kafkakafkacat

  • Kafka 集群与我的所有其他机器在同一个 VPC 中,它的安全组允许该 VPC 内的任何传入和传出流量。

  • 但是,它由 Amazon 的 Managed Streaming for Kafka (MSK) servive 管理,这意味着我无法对服务器安装设置进行精细控制(甚至不知道它们是什么)。MSK 只发布 Zookeeper 和消息代理 URL 供客户端使用。

  • 生产者作为 AWS Lambda 函数运行,但当我在普通 EC2 实例上运行它时问题仍然存在。

  • 权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少权限)。

  • 连接不是问题。我可以使用标准 telnet 访问 zookeeper 和消息代理的 URL。然而,向zookeeper 发出命令是有效的,而向消息代理发出命令最终总是失败。由于Kafka 在 TCP 上使用二进制协议,我不知道如何进一步调试问题。

编辑

按照建议,我调试了这个

./kafkacat -b $BROKERS -L -d 经纪人

并得到:

7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
Run Code Online (Sandbox Code Playgroud)

那么,这是客户端和代理 API 版本之间的一种不匹配吗?请记住,我无法控制 AWS 提供的 Kafka 集群的版本或配置,我该如何从中恢复?

Gar*_*man 6

我认为这与 TLS 加密有关。默认情况下,MSK 启动一个接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取引导服务器,它只会为您提供 TLS 端口。如果是这种情况,请尝试改用 PLAINTEXT 端口 9092。

要为 TLS 验证客户端,您需要生成一个证书:https : //docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html然后需要将此证书放到您的 lambda 上并参考生产者配置中的证书。

如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS 开发工具包获取引导服务器时,它将为您提供 PLAINTEXT 端口,您应该没问题。