Kil*_*oth 7 python amazon-web-services apache-kafka kafka-producer-api amazon-msk
我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在 Type 的每条消息之后,A我立即收到一个 type B... 并最终收到一个type C:
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Run Code Online (Sandbox Code Playgroud)
是什么导致代理节点接受来自有希望的生产者的 TCP 连接,然后立即再次关闭它?
编辑
该主题已存在,并kafka-topics.sh --list显示它。
我用过的所有客户端都遇到同样的问题:Kafka's kafka-console-producer.sh、kafka-python、confluent-kafka和kafkacat
Kafka 集群与我的所有其他机器在同一个 VPC 中,它的安全组允许该 VPC 内的任何传入和传出流量。
但是,它由 Amazon 的 Managed Streaming for Kafka (MSK) servive 管理,这意味着我无法对服务器安装设置进行精细控制(甚至不知道它们是什么)。MSK 只发布 Zookeeper 和消息代理 URL 供客户端使用。
生产者作为 AWS Lambda 函数运行,但当我在普通 EC2 实例上运行它时问题仍然存在。
权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少权限)。
连接不是问题。我可以使用标准 telnet 访问 zookeeper 和消息代理的 URL。然而,向zookeeper 发出命令是有效的,而向消息代理发出命令最终总是失败。由于Kafka 在 TCP 上使用二进制协议,我不知道如何进一步调试问题。
编辑
按照建议,我调试了这个
./kafkacat -b $BROKERS -L -d 经纪人
并得到:
7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
Run Code Online (Sandbox Code Playgroud)
那么,这是客户端和代理 API 版本之间的一种不匹配吗?请记住,我无法控制 AWS 提供的 Kafka 集群的版本或配置,我该如何从中恢复?
我认为这与 TLS 加密有关。默认情况下,MSK 启动一个接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取引导服务器,它只会为您提供 TLS 端口。如果是这种情况,请尝试改用 PLAINTEXT 端口 9092。
要为 TLS 验证客户端,您需要生成一个证书:https : //docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html然后需要将此证书放到您的 lambda 上并参考生产者配置中的证书。
如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS 开发工具包获取引导服务器时,它将为您提供 PLAINTEXT 端口,您应该没问题。