我使用一个简单的 Flask 应用程序和 gunicorn 的 gevent worker 来服务服务器发送的事件。
要流式传输内容,我使用:
response = Response(eventstream(), mimetype="text/event-stream")
Run Code Online (Sandbox Code Playgroud)
从 redis 流事件:
def eventstream():
for message in pubsub.listen():
# ...
yield str(event)
Run Code Online (Sandbox Code Playgroud)
部署:
gunicorn -k gevent -b 127.0.0.1:50008 flaskapplication
Run Code Online (Sandbox Code Playgroud)
但是在使用一段时间后,我打开了 50 个 redis 连接,即使没有人再连接到服务器发送的事件流。
似乎视图不会终止,因为 gunicorn 是非阻塞的,而 pubsub.listen() 是阻塞的。
我怎样才能解决这个问题?我应该限制gunicorn可能产生的进程数量,还是应该在超时后烧瓶杀死视图?如果可能,它应该在不活动时停止视图/redis 连接,而不断开仍然连接到 SSE 流的用户。
使用Timeoutnatdempk 解决方案中的对象,最优雅的解决方案是发送心跳,以检测死连接:
while True:
pubsub = redis.pubsub()
try:
with Timeout(30) as timeout:
for message in pubsub.listen():
# ...
yield str(event)
timeout.cancel()
timeout.start()
except Timeout, t:
if t is not timeout:
raise
else:
yield ":\n\n" # heartbeat
Run Code Online (Sandbox Code Playgroud)
注意,需要redis.pubsub()再次调用,因为异常后redis连接丢失,会报错NoneType object has no attribute readline。