使用 kafka-python 发送数据仅在短暂延迟代码时有效

ves*_*che 6 python python-2.7 apache-kafka

我正在使用 kafka-python 向 Kafka 主题发送一些数据。我挣扎了一段时间无法将数据发送到我的 Kafka 主题,直到我发现如果我短暂延迟代码它就可以工作。

from kafka import KafkaProducer
from time import sleep

producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send("topic", "foo")
sleep(.1)
Run Code Online (Sandbox Code Playgroud)

此代码适合我,而无需使用工作sleep(.1)。这就像发送数据需要时间来让它正常工作一样。kafka-python 客户端中是否有处理此问题的内容?或者更好的解决方案?

Tim*_*m D 6

一年后,但对于任何看到这一点的人来说,下面是一个解决方案。这里的问题是脚本结束和发送调用的竞争条件,这就是 sleep() 命令起作用的原因。

kafka 模块应该更好地处理 python 出口,或者至少输出一些标准输出/错误,所以这种行为不是沉默的。

来自 kafka-python github:

# Block until a single message is sent (or timeout)
future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)
Run Code Online (Sandbox Code Playgroud)

现在您可以保证您的脚本将阻塞,直到消息被确认发布。