卡夫卡生产者刷新和轮询之间的区别

sha*_*eel 13 python apache-kafka kafka-producer-api confluent-platform

我们有一个 Kafka 消费者,它将读取消息并执行这些操作,然后使用以下脚本再次发布到 Kafka 主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}
Run Code Online (Sandbox Code Playgroud)

我还没有配置任何其他配置,如 queue.buffering.max.messages queue.buffering.max.ms batch.num.messages

我假设这些都将成为配置中的默认值

queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
Run Code Online (Sandbox Code Playgroud)

我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用 generate() 时每条消息都会被发布。如果我错了,请纠正我。

我的制作人片段:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()
Run Code Online (Sandbox Code Playgroud)

这篇文章我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 需要大约 45 毫秒

如果我将上面的代码段更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)
Run Code Online (Sandbox Code Playgroud)

有什么性能会提高吗?你能澄清一下我的理解吗?

谢谢

Mic*_*son 16

flush()和之间的区别在poll()客户的文档中进行了解释。

对于flush(),它指出:

等待生产者队列中的所有消息被传递。这是一个调用 poll() 的便捷方法,直到 len() 为零或可选的超时时间过去。

对于poll()

轮询生产者的事件并调用相应的回调(如果已注册)。

poll()在 a 之后调用send()不会使生产者同步,因为刚发送的消息不太可能已经到达代理并且交付报告已经发送回客户端。

而是flush()会阻塞,直到先前发送的消息已交付(或出错),从而有效地使生产者同步。

  • 感谢您提供信息。它确实对我有用。时间从 45ms 缩短至 ~0.2ms (2认同)
  • `flush()` 将等待发送报告被确认,这比 `poll(0)` 长得多,后者仅尝试获取先前发送消息的发送报告。如果有人对细节感兴趣,[本期](https://github.com/confluenceinc/confluence-kafka-python/issues/137)值得一读。 (2认同)