Jim*_*lay 1 python apache-kafka kafka-python
我有一个生产者代码,正在向 Kafka 发送消息。直到昨天我才能发送消息。从今天开始,我无法发送消息。不确定是否是版本兼容问题。没有失败或错误消息,代码被执行,但没有发送消息。
以下是 Python 模块版本:
kafka-python==2.0.1
Python 3.8.2
下面是我的代码:
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
Run Code Online (Sandbox Code Playgroud)
我也尝试记录行为,但不知道为什么生产者被关闭:
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed
Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)
producer.flush()最后添加帮助我解决了问题。在实际提交事务之前,任何未完成的消息都将被刷新(传递)
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
producer.flush()
Run Code Online (Sandbox Code Playgroud)