NoBrokersAvailable:Kafka 中出现 NoBrokersAvailable 错误

Lou*_*och 4 python apache-kafka

我使用以下代码在 Jupyter 笔记本中偶然发现了“NoBrokersAvailable:NoBrokersAvailable”错误:

from kafka import KafkaProducer
from kafka.errors import KafkaError

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
INTERVAL =10
while True:
    data_points = get_realtime_stock('AAPL')
    data = {'updated_on': data_points['updated_on'], 'ticker': data_points['security']['ticker'] ,'last_price': data_points['last_price']}
    message = data_points
    producer.send('data1', value=data).add_callback(on_send_success).add_errback(on_send_error)
    time.sleep(INTERVAL)
Run Code Online (Sandbox Code Playgroud)

这里是相应的错误:

---------------------------------------------------------------------------
NoBrokersAvailable                        Traceback (most recent call last)
<ipython-input-8-cab724428b84> in <module>
     11     # handle exception
     12 
---> 13 producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
     14 INTERVAL =10
     15 while True:

~/anaconda3/lib/python3.7/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
    379         client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
    380                              wakeup_timeout_ms=self.config['max_block_ms'],
--> 381                              **self.config)
    382 
    383         # Get auto-discovered version from client if necessary

~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in __init__(self, **configs)
    237         if self.config['api_version'] is None:
    238             check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 239             self.config['api_version'] = self.check_version(timeout=check_timeout)
    240 
    241     def _can_bootstrap(self):

~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
    890         else:
    891             self._lock.release()
--> 892             raise Errors.NoBrokersAvailable()
    893 
    894     def wakeup(self):

NoBrokersAvailable: NoBrokersAvailable
Run Code Online (Sandbox Code Playgroud)

该代码工作得很好,但不知何故它突然停止工作。有谁知道可能是什么问题?

Ska*_* HR 14

我遇到了同样的错误,我通过在函数 KafkaProducer 上指定 API 版本来解决它。这是我的代码的示例。

如果错误仍然存​​在,请指定您的 kafka-python 库的版本。

producer = KafkaProducer(
    bootstrap_servers=#####,
    client_id=######,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)
Run Code Online (Sandbox Code Playgroud)

对于 API 版本,您应该输入您的 Kafka 版本。