Flask:后台线程将非空队列视为空

Ada*_*tan 11 python multithreading flask uwsgi

当我在uwsgi中运行Flask应用程序时,后台线程和应用程序函数在查询同一队列的大小时会看到不同的值.

组件

  • 带有线程安全队列的 Flask应用程序.
  • 一个GET调用返回队列的大小.
  • POST呼叫添加到队列的元素.
  • 后台线程打印队列大小

问题

当应用程序来自shell时python tester.py,我得到了预期的结果:

2014-06-07 14:20:50.677995 Queue size is: 0
127.0.0.1 - - [07/Jun/2014 14:20:51] "POST /addMessage/X HTTP/1.1" 200 -
2014-06-07 14:20:51.679277 Queue size is: 1
2014-06-07 14:20:52.680425 Queue size is: 1
2014-06-07 14:20:53.681566 Queue size is: 1
2014-06-07 14:20:54.682708 Queue size is: 1
127.0.0.1 - - [07/Jun/2014 14:20:55] "POST /addMessage/Y HTTP/1.1" 200 -
2014-06-07 14:20:55.687755 Queue size is: 2
2014-06-07 14:20:56.688867 Queue size is: 2
Run Code Online (Sandbox Code Playgroud)

但是,当使用app执行时uwsgi,我会在日志中获得以下内容:

2014-06-07 14:17:42.056863 Queue size is: 0
2014-06-07 14:17:43.057952 Queue size is: 0
[pid: 9879|app: 0|req: 6/6] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun  7 14:17:43 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:44.059037 Queue size is: 0
2014-06-07 14:17:45.060118 Queue size is: 0
[pid: 9879|app: 0|req: 7/7] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun  7 14:17:45 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:46.061205 Queue size is: 0
2014-06-07 14:17:47.062286 Queue size is: 0
Run Code Online (Sandbox Code Playgroud)

在uwsgi下运行时,后台线程看不到与应用程序相同的队列.这是为什么?如何让这两个线程看到同一个Queue对象?

更新

  • 即使它作为Python脚本执行,我也会看到不一致的行为:有时它无法记录消息(使用app.logger),我只能看到prints.这意味着线程正在运行,但它无法执行任何操作app.logger.

uwsgi .ini配置

[uwsgi]
http-socket    = :9002
plugin         = python
wsgi-file      = /home/ubuntu/threadtest-uwsgi.py
enable-threads = true
workers        = 1
chdir          = /home/ubuntu/thread-tester/thread_tester
Run Code Online (Sandbox Code Playgroud)

from flask import Flask, jsonify
import Queue
from threading import Thread
import time
import datetime
import logging
import sys

logging.basicConfig(stream=sys.stderr,
                    format='%(asctime)s %(levelname)s - %(message)s')

app = Flask(__name__)
messages = Queue.Queue()

def print_queue_size():
    while True:
        app.logger.debug("%s Queue size is: %d" % (datetime.datetime.now(),
                                        messages.qsize()))
        time.sleep(1)

t = Thread(target=print_queue_size, args=())
t.setDaemon(True)
t.start()

@app.route("/queueSize", methods=["GET"])
def get_queue_size():
    return jsonify({"qsize": messages.qsize()}), 200

@app.route("/addMessage/<message>", methods=["POST"])
def add_message_to_queue(message):
    messages.put(message)
    return jsonify({"qsize": messages.qsize()}), 200

if __name__ == "__main__":
    app.run(port=6000)
Run Code Online (Sandbox Code Playgroud)

Mar*_*ers 12

事物知识文档页面:

uWSGI尽可能尝试(ab)使用fork()调用的Copy On Write语义.默认情况下,它会在加载应用程序之后进行分叉,以尽可能多地共享内存.如果由于某种原因这种行为是不合需要的,请使用该lazy选项.这将指示uWSGI在每个worker的fork()之后加载应用程序.延迟模式改变了正常重新加载的工作方式:不是重新加载整个实例,而是在链中重新加载每个工作程序.如果你想要"延迟应用程序加载",但想要保持标准的uWSGI重新加载行为,从1.3开始就可以使用该lazy-apps选项.

您的Flask应用程序在uWSGI启动时启动,然后分叉一个工作进程.在分叉时,Queue对象为空,不再与原始进程共享.线程没有被带走.

尝试设置lazy-apps选项 以延迟加载Flask应用程序,直到启动工作程序.