Nav*_*h G 1 python web-services tornado flask
我有写在一个WebService 瓶,包裹在一个WSGIContainer由担任龙卷风利用其FallbackHandler机制.我在flask webservice中的一个路由运行一个非常长的操作(大约需要5分钟完成),当触发此路由时,阻止对任何路由的每个其他调用,直到操作完成.我该如何解决这个问题?
以下是使用Tornado提供Flask应用程序的方法:
parse_command_line()
frontend_path = os.path.join(os.path.dirname(__file__),"..","webapp")
rest_app = WSGIContainer(app)
tornado_app = Application(
[
(r"/api/(.*)", FallbackHandler, dict(fallback=rest_app)),
(r"/app/(.*)", StaticFileHandler, dict(path=frontend_path))
]
)
Run Code Online (Sandbox Code Playgroud)
我创建了一个自定义WSGIHandler,它通过使用a支持Tornado中的WSGI应用程序的多线程请求ThreadPoolExecutor.对WSGI应用程序的所有调用都在单独的线程中执行,因此即使您的WSGI响应需要很长时间,主循环也会保持空闲.以下代码基于此Gist并进行了扩展,以便:
目前代码只测试过Python 3.4,所以我不知道它是否适用于Python 2.7.它还没有经过压力测试,但到目前为止似乎工作正常.
# tornado_wsgi.py
import itertools
import logging
import sys
import tempfile
from concurrent import futures
from io import BytesIO
from tornado import escape, gen, web
from tornado.iostream import StreamClosedError
from tornado.wsgi import to_wsgi_str
_logger = logging.getLogger(__name__)
@web.stream_request_body
class WSGIHandler(web.RequestHandler):
thread_pool_size = 20
def initialize(self, wsgi_application):
self.wsgi_application = wsgi_application
self.body_chunks = []
self.body_tempfile = None
def environ(self, request):
"""
Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
"""
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
port = int(hostport[1])
else:
host = request.host
port = 443 if request.protocol == "https" else 80
if self.body_tempfile is not None:
body = self.body_tempfile
body.seek(0)
elif self.body_chunks:
body = BytesIO(b''.join(self.body_chunks))
else:
body = BytesIO()
environ = {
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
"PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None, plus=False)),
"QUERY_STRING": request.query,
"REMOTE_ADDR": request.remote_ip,
"SERVER_NAME": host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": request.version,
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.protocol,
"wsgi.input": body,
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if "Content-Type" in request.headers:
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
if "Content-Length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
for key, value in request.headers.items():
environ["HTTP_" + key.replace("-", "_").upper()] = value
return environ
def prepare(self):
# Accept up to 2GB upload data.
self.request.connection.set_max_body_size(2 << 30)
@gen.coroutine
def data_received(self, chunk):
if self.body_tempfile is not None:
yield self.executor.submit(lambda: self.body_tempfile.write(chunk))
else:
self.body_chunks.append(chunk)
# When the request body grows larger than 1 MB we dump all receiver chunks into
# a temporary file to prevent high memory use. All subsequent body chunks will
# be directly written into the tempfile.
if sum(len(c) for c in self.body_chunks) > (1 << 20):
self.body_tempfile = tempfile.NamedTemporaryFile('w+b')
def copy_to_file():
for c in self.body_chunks:
self.body_tempfile.write(c)
# Remove the chunks to clear the memory.
self.body_chunks[:] = []
yield self.executor.submit(copy_to_file)
@web.asynchronous
@gen.coroutine
def get(self):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data['status'] = status
data['headers'] = response_headers
return response.append
environ = self.environ(self.request)
app_response = yield self.executor.submit(self.wsgi_application, environ, start_response)
app_response = iter(app_response)
if not data:
raise Exception('WSGI app did not call start_response')
try:
exhausted = object()
def next_chunk():
try:
return next(app_response)
except StopIteration:
return exhausted
for i in itertools.count():
chunk = yield self.executor.submit(next_chunk)
if i == 0:
status_code, reason = data['status'].split(None, 1)
status_code = int(status_code)
headers = data['headers']
self.set_status(status_code, reason)
for key, value in headers:
self.set_header(key, value)
c = b''.join(response)
if c:
self.write(c)
yield self.flush()
if chunk is not exhausted:
self.write(chunk)
yield self.flush()
else:
break
except StreamClosedError:
_logger.debug('stream closed early')
finally:
# Close the temporary file to make sure that it gets deleted.
if self.body_tempfile is not None:
try:
self.body_tempfile.close()
except OSError as e:
_logger.warning(e)
if hasattr(app_response, 'close'):
yield self.executor.submit(app_response.close)
post = put = delete = head = options = get
@property
def executor(self):
cls = type(self)
if not hasattr(cls, '_executor'):
cls._executor = futures.ThreadPoolExecutor(cls.thread_pool_size)
return cls._executor
Run Code Online (Sandbox Code Playgroud)
以下是一个简单的Flask应用程序,它演示了WSGIHandler.在hello()一秒钟的功能块,所以如果你ThreadPoolExecutor使用20个线程,你将能够加载在同一时间20次请求(在一秒钟内).
该stream()函数创建一个迭代器响应,并在5秒内将50块数据流传输到客户端.应该注意的是,这里可能无法使用Flask的stream_with_context装饰器:由于迭代器的每个加载都会产生一个新的executor.submit(),很可能来自流响应的不同块将从不同的线程加载,从而破坏了Flask对线程的使用-locals.
import time
from flask import Flask, Response
from tornado import ioloop, log, web
from tornado_wsgi import WSGIHandler
def main():
app = Flask(__name__)
@app.route("/")
def hello():
time.sleep(1)
return "Hello World!"
@app.route("/stream")
def stream():
def generate():
for i in range(50):
time.sleep(0.1)
yield '%d\n' % i
return Response(generate(), mimetype='text/plain')
application = web.Application([
(r'/.*', WSGIHandler, {'wsgi_application': app}),
])
log.enable_pretty_logging()
application.listen(8888)
ioloop.IOLoop.instance().start()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1941 次 |
| 最近记录: |