Tornado 协程 - 自定义函数

Reg*_*ult 1 python asynchronous tornado

我正在从 Tornado 理解协程,所以让我们保持一切简单,粘贴的代码越多越好。

我想要的是让我自制的函数异步。

我可以在文档中找到的所有示例都属于同一个“隐藏”部分:AsyncHTTPClient。我不打算进行 HTTP 调用。所以请不要给我举那个班级的例子。我有兴趣从头开始创造一些东西。我已经尝试了Tornado 协程的所有可能性

现在我一直在用 bash sleep 进行测试。这是代码:

import tornado.web
import tornado.httpserver
import tornado.gen
import tornado.concurrent
import subprocess
import os

@tornado.gen.coroutine
def letswait():
    fut = tornado.concurrent.Future()
    subprocess.check_output(["sleep", "5"])
    fut.set_result(42)
    return fut

class TestHandler1(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        value = yield letswait()
        self.render("test.html", num=value)

class TestHandler2(tornado.web.RequestHandler):
    def get(self):
        self.render("test.html", num=66)

class Application(tornado.web.Application):
    def __init__(self):
        DIRNAME = os.path.dirname(__file__)
        STATIC_PATH = os.path.join(DIRNAME, '../static')
        TEMPLATE_PATH = os.path.join(DIRNAME, '../template')
        sets = {
            "template_path":TEMPLATE_PATH,
            "static_path":STATIC_PATH,
            "debug":True,
        }
        tornado.web.Application.__init__(self, [
            (r"/test1", TestHandler1),
            (r"/test2", TestHandler2),
        ], **sets)

def main():
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(8888)
    print "Let s start"
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

但是,如果我访问 test1,那么我需要等待调用返回才能访问 test2。据我所知,我需要使用gen.sleep(5). 但这只是一个例子。假设不是sleep 5在 bash上运行,而是我正在运行ssh somewhere 'do_something',这需要一些时间才能运行。

有人告诉我“这个函数不是异步的”。所以我的问题是如何使自定义函数异步?

编辑:稍微搜索后,我看到这里使用了tornado.process https://gist.github.com/FZambia/5756470。但是我的子流程来自第 3 方,所以这不是我可以覆盖的。所以我的问题也是,如何将 3rd 方库与该 gen.coroutine 系统集成?

解决方案:感谢下面的评论,我有一个解决方案:

import tornado.web
import tornado.httpserver
import tornado.gen
import tornado.concurrent
import subprocess
import os

from concurrent import futures

# Create a threadpool, and this can be shared around different python files
# which will not re-create 10 threadpools when we call it.
# we can a handful of executors for running synchronous tasks

# Create a 10 thread threadpool that we can use to call any synchronous/blocking functions
executor = futures.ThreadPoolExecutor(10)

def letswait():
    result_future = tornado.concurrent.Future()
    subprocess.check_output(["sleep", "5"])
    result_future.set_result(42)
    return result_future

class TestHandler1(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        value = yield executor.submit(letswait)
        self.render("test.html", num=value)

class TestHandler2(tornado.web.RequestHandler):
    def get(self):
        self.render("test.html", num=66)

class Application(tornado.web.Application):
    def __init__(self):
        DIRNAME = os.path.dirname(__file__)
        STATIC_PATH = os.path.join(DIRNAME, '../static')
        TEMPLATE_PATH = os.path.join(DIRNAME, '../template')
        sets = {
            "template_path":TEMPLATE_PATH,
            "static_path":STATIC_PATH,
            "debug":True,
        }
        tornado.web.Application.__init__(self, [
            (r"/test1", TestHandler1),
            (r"/test2", TestHandler2),
        ], **sets)

def main():
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(8888)
    print "Let s start"
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

use*_*751 5

我在这里问了一个类似的问题:Python Tornado - Confused how to convert ablocking function into a non-blocking function

问题是您的函数可能受 CPU 限制,唯一的方法是使用执行程序。

from concurrent import futures

# Create a threadpool, and this can be shared around different python files
# which will not re-create 10 threadpools when we call it.
# we can a handful of executors for running synchronous tasks

# Create a 10 thread threadpool that we can use to call any synchronous/blocking functions
executor = futures.ThreadPoolExecutor(10)
Run Code Online (Sandbox Code Playgroud)

然后你可以做这样的事情:

@gen.coroutine
def get(self):
    json = yield executor.submit(some_long_running_function)
Run Code Online (Sandbox Code Playgroud)

这个任务将被搁置一旁,独立运行,因为有一个 yield 关键字,tornado 会做一些其他的事情,同时在它当前运行的和你的进程之间做一个纯线程切换。这对我来说似乎很好。

换句话说,您可以将子流程包装在执行程序中,并且它将被异步处理。

如果您不想使用执行程序,似乎您的功能需要以状态机方式实现。

另一篇文章:https : //emptysqua.re/blog/motor-internals-how-i-asyncronized-a-synchronous-library/

请注意,Momoko (Postgres) 和 Motor (MongoDB) 都是 I/O Bound。

编辑:我不确定您对 Tornado 的用途是什么。我在执行大量 I/O 时使用 Tornado,因为我受 I/O 限制。但是,我想如果您的用途更受 CPU 限制,您可能想看看 Flask。您可以轻松地使用 Gunicorn 和 Flask 来创建简单的东西,并利用多个内核。尝试在 Tornado 中使用多线程或多核可能会让您头疼,因为 Tornado 中的很多东西都不是线程安全的。

编辑 2:删除了 .result() 调用。