使龙卷风在单独的线程上提供请求

Nav*_*h G 1 python web-services tornado flask

我有写在一个WebService ,包裹在一个WSGIContainer由担任龙卷风利用其FallbackHandler机制.我在flask webservice中的一个路由运行一个非常长的操作(大约需要5分钟完成),当触发此路由时,阻止对任何路由的每个其他调用,直到操作完成.我该如何解决这个问题?

以下是使用Tornado提供Flask应用程序的方法:

parse_command_line()

    frontend_path = os.path.join(os.path.dirname(__file__),"..","webapp")

    rest_app = WSGIContainer(app)
    tornado_app = Application(
        [
            (r"/api/(.*)", FallbackHandler, dict(fallback=rest_app)),
            (r"/app/(.*)", StaticFileHandler, dict(path=frontend_path))
        ]
    )
Run Code Online (Sandbox Code Playgroud)

skr*_*use 5

我创建了一个自定义WSGIHandler,它通过使用a支持Tornado中的WSGI应用程序的多线程请求ThreadPoolExecutor.对WSGI应用程序的所有调用都在单独的线程中执行,因此即使您的WSGI响应需要很长时间,主循环也会保持空闲.以下代码基于此Gist并进行了扩展,以便:

  • 您可以将响应(使用迭代器响应)或大文件直接从WSGI应用程序流式传输到客户端,这样即使在生成大量响应时也可以保持较低的内存使用率.
  • 您可以上传大文件.如果请求正文超过1 MB,则整个请求正文被转储到一个临时文件中,然后传递给WSGI应用程序.

目前代码只测试过Python 3.4,所以我不知道它是否适用于Python 2.7.它还没有经过压力测试,但到目前为止似乎工作正常.

# tornado_wsgi.py

import itertools
import logging
import sys
import tempfile
from concurrent import futures
from io import BytesIO

from tornado import escape, gen, web
from tornado.iostream import StreamClosedError
from tornado.wsgi import to_wsgi_str

_logger = logging.getLogger(__name__)


@web.stream_request_body
class WSGIHandler(web.RequestHandler):
    thread_pool_size = 20

    def initialize(self, wsgi_application):
        self.wsgi_application = wsgi_application

        self.body_chunks = []
        self.body_tempfile = None

    def environ(self, request):
        """
        Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
        """
        hostport = request.host.split(":")
        if len(hostport) == 2:
            host = hostport[0]
            port = int(hostport[1])
        else:
            host = request.host
            port = 443 if request.protocol == "https" else 80

        if self.body_tempfile is not None:
            body = self.body_tempfile
            body.seek(0)
        elif self.body_chunks:
            body = BytesIO(b''.join(self.body_chunks))
        else:
            body = BytesIO()

        environ = {
            "REQUEST_METHOD": request.method,
            "SCRIPT_NAME": "",
            "PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None, plus=False)),
            "QUERY_STRING": request.query,
            "REMOTE_ADDR": request.remote_ip,
            "SERVER_NAME": host,
            "SERVER_PORT": str(port),
            "SERVER_PROTOCOL": request.version,
            "wsgi.version": (1, 0),
            "wsgi.url_scheme": request.protocol,
            "wsgi.input": body,
            "wsgi.errors": sys.stderr,
            "wsgi.multithread": False,
            "wsgi.multiprocess": True,
            "wsgi.run_once": False,
        }
        if "Content-Type" in request.headers:
            environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
        if "Content-Length" in request.headers:
            environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
        for key, value in request.headers.items():
            environ["HTTP_" + key.replace("-", "_").upper()] = value
        return environ

    def prepare(self):
        # Accept up to 2GB upload data.
        self.request.connection.set_max_body_size(2 << 30)

    @gen.coroutine
    def data_received(self, chunk):
        if self.body_tempfile is not None:
            yield self.executor.submit(lambda: self.body_tempfile.write(chunk))
        else:
            self.body_chunks.append(chunk)

            # When the request body grows larger than 1 MB we dump all receiver chunks into
            # a temporary file to prevent high memory use. All subsequent body chunks will
            # be directly written into the tempfile.
            if sum(len(c) for c in self.body_chunks) > (1 << 20):
                self.body_tempfile = tempfile.NamedTemporaryFile('w+b')
                def copy_to_file():
                    for c in self.body_chunks:
                        self.body_tempfile.write(c)
                    # Remove the chunks to clear the memory.
                    self.body_chunks[:] = []
                yield self.executor.submit(copy_to_file)

    @web.asynchronous
    @gen.coroutine
    def get(self):
        data = {}
        response = []

        def start_response(status, response_headers, exc_info=None):
            data['status'] = status
            data['headers'] = response_headers
            return response.append

        environ = self.environ(self.request)
        app_response = yield self.executor.submit(self.wsgi_application, environ, start_response)
        app_response = iter(app_response)

        if not data:
            raise Exception('WSGI app did not call start_response')

        try:
            exhausted = object()

            def next_chunk():
                try:
                    return next(app_response)
                except StopIteration:
                    return exhausted

            for i in itertools.count():
                chunk = yield self.executor.submit(next_chunk)
                if i == 0:
                    status_code, reason = data['status'].split(None, 1)
                    status_code = int(status_code)
                    headers = data['headers']
                    self.set_status(status_code, reason)
                    for key, value in headers:
                        self.set_header(key, value)
                    c = b''.join(response)
                    if c:
                        self.write(c)
                        yield self.flush()
                if chunk is not exhausted:
                    self.write(chunk)
                    yield self.flush()
                else:
                    break
        except StreamClosedError:
            _logger.debug('stream closed early')
        finally:
            # Close the temporary file to make sure that it gets deleted.
            if self.body_tempfile is not None:
                try:
                    self.body_tempfile.close()
                except OSError as e:
                    _logger.warning(e)

            if hasattr(app_response, 'close'):
                yield self.executor.submit(app_response.close)

    post = put = delete = head = options = get

    @property
    def executor(self):
        cls = type(self)
        if not hasattr(cls, '_executor'):
            cls._executor = futures.ThreadPoolExecutor(cls.thread_pool_size)
        return cls._executor
Run Code Online (Sandbox Code Playgroud)

以下是一个简单的Flask应用程序,它演示了WSGIHandler.在hello()一秒钟的功能块,所以如果你ThreadPoolExecutor使用20个线程,你将能够加载在同一时间20次请求(在一秒钟内).

stream()函数创建一个迭代器响应,并在5秒内将50块数据流传输到客户端.应该注意的是,这里可能无法使用Flask的stream_with_context装饰器:由于迭代器的每个加载都会产生一个新的executor.submit(),很可能来自流响应的不同块将从不同的线程加载,从而破坏了Flask对线程的使用-locals.

import time
from flask import Flask, Response
from tornado import ioloop, log, web
from tornado_wsgi import WSGIHandler

def main():
    app = Flask(__name__)

    @app.route("/")
    def hello():
        time.sleep(1)
        return "Hello World!"

    @app.route("/stream")
    def stream():
        def generate():
            for i in range(50):
                time.sleep(0.1)
                yield '%d\n' % i
        return Response(generate(), mimetype='text/plain')

    application = web.Application([
        (r'/.*', WSGIHandler, {'wsgi_application': app}),
    ])

    log.enable_pretty_logging()
    application.listen(8888)
    ioloop.IOLoop.instance().start()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)