mic*_*ele 3 python apache-kafka message-hub ibm-cloud
我正在尝试连接到http://bluemix.net上的Bluemix Message Hub实例.这个简单的脚本
#!/usr/bin/env python
from kafka import KafkaProducer
from kafka.errors import KafkaError
kafka_brokers_sasl = [
"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka05-prod01.messagehub.services.us-south.bluemix.net:9093" ]
sasl_plain_username = "xxxxxxxxxxxxxxx"
sasl_plain_password = "xxxxxxxxxxxxxxxxxxxxxxxxx"
sasl_mechanism = 'SASL_PLAINTEXT'
producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl,
sasl_plain_username = sasl_plain_username,
sasl_plain_password = sasl_plain_password,
sasl_mechanism = sasl_mechanism )
Run Code Online (Sandbox Code Playgroud)
以下例外结束:
Traceback (most recent call last):
File "./test-mh.py", line 12, in <module>
producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl, sasl_plain_username = sasl_plain_username, sasl_plain_password = sasl_plain_password, sasl_mechanism = sasl_mechanism )
File "/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py", line 328, in __init__
**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 791, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Run Code Online (Sandbox Code Playgroud)
我从messagehub服务凭证对象获得了kafka_brokers_sasl,sasl_plain_username和sasl_plain_password.我正在使用kafka-python 1.3.1,它似乎支持SASL身份验证机制.我知道我做错了什么吗?谢谢.
Message Hub要求客户端使用TLS 1.2连接进行连接.这意味着,指定一个security_protocol参数KafkaProducer,并且也是ssl.SSLContext通过ssl_context参数-因为它看来,Python的卡夫卡客户端创建SSLv23默认上下文.
以下是连接所需的更改:
import ssl
from kafka import KafkaProducer
from kafka.errors import KafkaError
kafka_brokers_sasl = [
"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
"kafka05-prod01.messagehub.services.us-south.bluemix.net:9093" ]
sasl_plain_username = "xxxxxxxxxxxxxxx"
sasl_plain_password = "xxxxxxxxxxxxxxxxxxxxxxxxx"
sasl_mechanism = 'PLAIN' # <-- changed from 'SASL_PLAINTEXT'
security_protocol = 'SASL_SSL'
# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl,
sasl_plain_username = sasl_plain_username,
sasl_plain_password = sasl_plain_password,
security_protocol = security_protocol,
ssl_context = context,
sasl_mechanism = sasl_mechanism)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
353 次 |
| 最近记录: |