如果生成的记录失败,我想设置要触发的回调。最初,我只想记录失败的记录。
Confluent Kafka python 库提供了一种添加回调的机制:
produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
Run Code Online (Sandbox Code Playgroud)
如何使用 kafka-python kafka.KafkaProducer#send()实现类似的行为,而不必使用已弃用的 SimpleClient 使用 kafka.SimpleClient#send_produce_request()
虽然它没有记录,但这相对简单。每当您发送消息时,您都会立即得到Future回复。您可以将回调/错误回复附加到Future:
F = producer.send(topic=topic, value=message, key=key)
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method)
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method)
Run Code Online (Sandbox Code Playgroud)
相关源代码在这里:https : //github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64
我们真的应该记录下来,我提交了https://github.com/dpkp/kafka-python/issues/1256来跟踪它。
| 归档时间: |
|
| 查看次数: |
3093 次 |
| 最近记录: |