Iva*_*van 3 python apache-kafka kafka-python
我使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我正在尝试测试一个简单的生产者:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
当这个过程被实例化时,消费者永远不会收到消息
如果我刷新生产者并更改 linger_ms 参数(使其同步),则消息将由消费者发送和读取:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
Run Code Online (Sandbox Code Playgroud)
在之前的 Kafka 版本中,有参数 queue.buffering.max.ms 来指定生产者将等待多长时间才能将消息发送到队列中,但在最新版本(kafka-python 1.3.3)中不存在。我如何在较新的 Kafka 版本中指定它以保持我的通信异步?
谢谢!
正如您所观察到的,消息排队等待异步发送,并且不能保证它会立即发送。因此,如果要强制将消息发送到代理,则需要显式调用producer.flush()which 将阻塞直到消息发送(尽管flush()不保证 acks)。
注意:因为flush()是阻塞调用,所以通常只推荐用于低吞吐量系统或应用程序关闭时。同步发送与异步发送的吞吐量命中对于大容量系统通常是不可行的。我的经验是,生产者通常无需调用flush()即可快速发送,除了测试套件/开发需要立即发生的地方。
我很确定参数queue.buffering.max.ms被替换为linger_ms:https : //kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer
因此,您已经在工作示例中使用了该参数。
producer = KafkaProducer(bootstrap_servers='kafkaIp:kafkaPort')
producer.send("topic_name", b'Your string here')
producer.flush()
Run Code Online (Sandbox Code Playgroud)
使用发送和刷新。
| 归档时间: |
|
| 查看次数: |
5637 次 |
| 最近记录: |