小编mdo*_*doe的帖子

Kafka python 消费者在并行线程中运行

我是 python 和 kafka 的新手。我有一个脚本应该启动三个 kafka 消费者,等待来自这些消费者的消息并执行一些其他操作。此时我什至不知道我是否朝着正确的方向前进,所以任何帮助将不胜感激。

class MainClass():
    def do_something_before(self):
        # something is done here

    def start_consumer(self):
        consumer1_thread = threading.Thread(target=self.cons1, args=())
        consumer2_thread = threading.Thread(target=self.cons2, args=())
        consumer1_thread.daemon = True
        consumer2_thread.daemon = True
        consumer1_thread.start()
        consumer2_thread.start()

    def cons1(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my-topic'])
        for message in consumer:
            print(message.value)

    def cons2(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my2-topic'])
        for message in consumer:
            print(message.value)

    def keep_working(self):
        # something is done here

if __name__ == 'main':
    g = MainClass()
    g.do_something_before()
    g.keep_working()
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python

2
推荐指数
1
解决办法
1万
查看次数

标签 统计

apache-kafka ×1

kafka-python ×1

python ×1