BAE*_*BAE 7 python apache-kafka kafka-consumer-api kafka-python
我正在做Python Kafka使用者(试图在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用kafka.consumer.SimpleConsumer或kafka.consumer.simple.SimpleConsumer ).当我运行以下代码时,即使消耗了所有消息,它也会一直运行.我希望消费者在消费所有消息时都会停止.怎么做?另外我不知道如何使用stop()函数(在基类kafka.consumer.base.Consumer中).
UPDATE
我使用信号处理程序来调用consumer.stop().一些错误消息被打印到屏幕上.但程序仍停留在for循环中.当新消息进入时,消费者消费并打印它们.我也尝试过client.close().但结果相同.
我需要一些方法来优雅地停止for循环.
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
Run Code Online (Sandbox Code Playgroud)
欢迎任何帮助.谢谢.
小智 6
我们可以先检查主题中最后一条消息的偏移量。然后,在达到该偏移量时停止循环。
client = "localhost:9092"
consumer = KafkaConsumer(client)
topic = 'test'
tp = TopicPartition(topic,0)
#register to the topic
consumer.assign([tp])
# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
Run Code Online (Sandbox Code Playgroud)
使用iter_timeout参数设置等待时间。如果设置为10,就像下面这段代码,如果10秒内没有新消息进来,就会退出。默认值是None,这意味着即使没有新消息进来,消费者也会阻塞在这里。
self.consumer = SimpleConsumer(self.client, "test-group", "test",
iter_timeout=10)
Run Code Online (Sandbox Code Playgroud)
更新
以上并不是一个好的方法。当大量消息传入时,很难设置足够小的 iter_timeout 来保证停止。所以,现在,我正在使用 get_message() 函数,它尝试消耗一条消息并停止。当没有新消息时,不返回任何内容。