如何在程序中停止Python Kafka Consumer?

BAE*_*BAE 7 python apache-kafka kafka-consumer-api kafka-python

我正在做Python Kafka使用者(试图在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用kafka.consumer.SimpleConsumer或kafka.consumer.simple.SimpleConsumer ).当我运行以下代码时,即使消耗了所有消息,它也会一直运行.我希望消费者在消费所有消息时都会停止.怎么做?另外我不知道如何使用stop()函数(在基类kafka.consumer.base.Consumer中).

UPDATE

我使用信号处理程序来调用consumer.stop().一些错误消息被打印到屏幕上.但程序仍停留在for循环中.当新消息进入时,消费者消费并打印它们.我也尝试过client.close().但结果相同.

我需要一些方法来优雅地停止for循环.

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value
Run Code Online (Sandbox Code Playgroud)

欢迎任何帮助.谢谢.

小智 6

我们可以先检查主题中最后一条消息的偏移量。然后,在达到该偏移量时停止循环。

    client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)

    consumer.seek_to_beginning(tp)        

    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break
Run Code Online (Sandbox Code Playgroud)


BAE*_*BAE 2

使用iter_timeout参数设置等待时间。如果设置为10,就像下面这段代码,如果10秒内没有新消息进来,就会退出。默认值是None,这意味着即使没有新消息进来,消费者也会阻塞在这里。

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)
Run Code Online (Sandbox Code Playgroud)

更新

以上并不是一个好的方法。当大量消息传入时,很难设置足够小的 iter_timeout 来保证停止。所以,现在,我正在使用 get_message() 函数,它尝试消耗一条消息并停止。当没有新消息时,不返回任何内容。