Flask API作为实时Kafka使用者

ste*_*fan 5 python flask apache-kafka

我想构建一个使用Flask框架开发的python API,该框架使用Kafka主题并将流推送到客户端(html页面或其他应用程序)。

我试图用虚拟数据生成实时流(请参见下面的实时路由)。发生的问题result是仅在循环完成后才推送result变量,而变量应在每次迭代时推送。

我还尝试通过Kafka连接生成实时流(请参见下面的kafka路由)。问题是没有数据返回,而是请求没有完成。

from flask import Response, Flask
import time
from kafka import KafkaConsumer

application = Flask(__name__)

@application.route('/')
def index():
    return "Hello, World!"


@application.route('/realtime/')
def realtime():

    def createGenerator():

        for i in range(1,10):
            yield str(i) + '\n'
            time.sleep(0.2)

    return Response(createGenerator())


@application.route('/kafka/')
def kafkaStream():
    consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
                     client_id = 'name of client',
                     auto_offset_reset = 'earliest',
                     value_deserializer = lambda m: json.loads(m.decode('ascii')))

    consumer.subscribe(topics=['my-topic'])

    def events():
        result = []
        for message in consumer:
           if message is not None:
               result.append(message.value)
           yield result
    return Response(events())

if __name__ == '__main__':
    application.run(debug = True)
Run Code Online (Sandbox Code Playgroud)

到目前为止,我有效地从Kafka接收数据的唯一方法是在控制台中打印结果。

from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
                     client_id = 'name of client',
                     auto_offset_reset = 'earliest',
                     value_deserializer = lambda m: json.loads(m.decode('ascii')))

consumer.subscribe(topics=['my-topic'])

for message in consumer:
    print message
Run Code Online (Sandbox Code Playgroud)

我认为问题在于,API不能在过程完成之前推送数据,并且由于KafkaConsumer连接是无限的,因此没有任何内容推送到客户端。

我该如何克服这个问题?

小智 0

对于任何其他正在寻找此问题解决方案的人。基于套接字的解决方案将在 kafka 消费者之上工作,该消费者要么不断侦听队列中的消息并发布消息。

检查此链接以获取更多信息。 kafka套接字