Python Producer 可以通过 shell 发送,但不能通过 .py

Tex*_*Tex 1 python python-3.x apache-kafka kafka-python

我有一个正在运行并经过测试的 Kafka 集群,我正在尝试使用 Python 脚本向代理发送消息。这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起。

我正在为消费者和生产者使用 kafka-python 库。当我使用 Python3 shell 时,我可以看到使用 Kafka GUI 工具 2.0.4 的主题中出现的消息我在 python 代码中尝试了各种循环和语句,但似乎没有任何东西让它“运行”完成。

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
Run Code Online (Sandbox Code Playgroud)

这有效并且字节出现在代理主题数据中。

当我将与上面相同的代码放在 python .py 文件中并使用 Python3 执行时,它会完成,但没有数据发送到 Kafka 代理。也没有显示错误。

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Run Code Online (Sandbox Code Playgroud)

cri*_*007 5

如您所见,它返回一个未来。

Kafka 客户端将批量记录,他们不会一次立即发送一条记录,要做到这一点,您需要等待或刷新生产者缓冲区,以便它在应用程序退出之前发送。换句话说,交互式终端将生产者数据保存在内存中,在后台运行,反之则丢弃该数据

作为文档,显示

future = producer.send(...)

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass
Run Code Online (Sandbox Code Playgroud)

或者producer.flush(),如果你不关心元数据或抓住未来。