使用ThreadPoolExecutor的龙卷风

del*_*boy 13 python io multithreading tornado

我有Tornado用作http服务器和自定义http框架的设置.想法是拥有单个龙卷风处理程序,并且每个到达的请求都应该只是提交给ThreadPoolExecutor并留下Tornado来监听新的请求.一旦线程完成处理请求,就会调用回调,该回调会在执行IO循环的同一线程中向客户端发送响应.

剥离,代码看起来像这样.基本http服务器类:

class HttpServer():
    def __init__(self, router, port, max_workers):
        self.router = router
        self.port = port
        self.max_workers = max_workers

    def run(self):
        raise NotImplementedError()
Run Code Online (Sandbox Code Playgroud)

Tornado支持HttpServer的实现:

class TornadoServer(HttpServer):
    def run(self):
        executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)

        def submit(callback, **kwargs):
            future = executor.submit(Request(**kwargs))
            future.add_done_callback(callback)
            return future

        application = web.Application([
            (r'(.*)', MainHandler, {
                'submit': submit,
                'router': self.router   
            })
        ])

        application.listen(self.port)

        ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)

处理所有龙卷风请求的主处理程序(仅实现GET,但其他请求相同):

class MainHandler():
    def initialize(self, submit, router):
        self.submit = submit
        self.router = router

    def worker(self, request):
        responder, kwargs = self.router.resolve(request)
        response = responder(**kwargs)
        return res

    def on_response(self, response):
        # when this is called response should already have result
        if isinstance(response, Future):
            response = response.result()
        # response is my own class, just write returned content to client
        self.write(response.data)
        self.flush()
        self.finish()

    def _on_response_ready(self, response):
        # schedule response processing in ioloop, to be on ioloop thread
        ioloop.IOLoop.current().add_callback(
            partial(self.on_response, response)
        )

    @web.asynchronous
    def get(self, url):
        self.submit(
            self._on_response_ready, # callback
            url=url, method='post', original_request=self.request
        )
Run Code Online (Sandbox Code Playgroud)

服务器的启动类似于:

router = Router()
server = TornadoServer(router, 1111, max_workers=50)
server.run()
Run Code Online (Sandbox Code Playgroud)

因此,正如您所看到的,主处理程序只是将每个请求提交给线程池,并且在处理完成时调用回调(_on_response_ready),它只调度请求完成在IO循环上执行(以确保它在相同的线程上完成)正在执行IO循环).

这有效.至少它看起来像它.

我的问题是ThreadPoolExecutor中有关max worker的性能.

所有处理程序都是IO绑定的,没有计算正在进行(它们主要等待数据库或外部服务),因此对于50名工作人员,我希望50个concurent请求比仅有一个worker的50个concurent请求快50倍.

但事实并非如此.我看到的是,当我有50个线程池工作者和1个工作者时,每秒几乎相同的请求.

为了测量,我使用了Apache-Bench,例如:

ab -n 100 -c 10 http://localhost:1111/some_url
Run Code Online (Sandbox Code Playgroud)

有谁知道我做错了什么?我是否误解了Tornado或ThreadPool的工作原理?还是组合?

J_H*_*J_H 1

正如 kwarunek 所建议的,postgres 的 momoko 包装器解决了这个问题。如果您想从外部协作者那里征求进一步的调试建议,那么从每次数据库访问之前执行 sleep(10) 的测试任务中发布带时间戳的调试日志将有所帮助。