一旦所有消息都用完了,如何关闭kafka用户?

pan*_*ish 5 python python-3.x kafka-consumer-api

我有以下程序可以使用所有发送到Kafka的消息。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()
Run Code Online (Sandbox Code Playgroud)

使用以上程序,我可以使用所有发送到Kafka的消息。但是,一旦所有消息都消耗掉了,我想关闭没有发生的kafka用户。我同样需要帮助。

pan*_*ish 6

如果我向KafkaConsumer对象提供consumer_timeout_ms参数,则可以立即关闭kafka消费者。它接受超时值(以毫秒为单位)。下面是代码片段。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,如果消费者在1秒钟内没有看到任何消息,它将关闭会话。

  • 感谢您的回答,似乎您想调用“consumer.close()”而不是“KafkaConsumer.close()”。 (3认同)