pan*_*ish 5 python python-3.x kafka-consumer-api
我有以下程序可以使用所有发送到Kafka的消息。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
Run Code Online (Sandbox Code Playgroud)
使用以上程序,我可以使用所有发送到Kafka的消息。但是,一旦所有消息都消耗掉了,我想关闭没有发生的kafka用户。我同样需要帮助。
如果我向KafkaConsumer对象提供consumer_timeout_ms参数,则可以立即关闭kafka消费者。它接受超时值(以毫秒为单位)。下面是代码片段。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'],
consumer_timeout_ms=1000)
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,如果消费者在1秒钟内没有看到任何消息,它将关闭会话。
| 归档时间: |
|
| 查看次数: |
3176 次 |
| 最近记录: |