我想构建一个使用Flask框架开发的python API,该框架使用Kafka主题并将流推送到客户端(html页面或其他应用程序)。
我试图用虚拟数据生成实时流(请参见下面的实时路由)。发生的问题result是仅在循环完成后才推送result变量,而变量应在每次迭代时推送。
我还尝试通过Kafka连接生成实时流(请参见下面的kafka路由)。问题是没有数据返回,而是请求没有完成。
from flask import Response, Flask
import time
from kafka import KafkaConsumer
application = Flask(__name__)
@application.route('/')
def index():
return "Hello, World!"
@application.route('/realtime/')
def realtime():
def createGenerator():
for i in range(1,10):
yield str(i) + '\n'
time.sleep(0.2)
return Response(createGenerator())
@application.route('/kafka/')
def kafkaStream():
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
client_id = 'name of client',
auto_offset_reset = 'earliest',
value_deserializer = lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(topics=['my-topic'])
def events():
result = []
for message in consumer:
if message is not None: …Run Code Online (Sandbox Code Playgroud)