在Tornado中创建处理队列

Sco*_*ott 7 python tornado

我正在使用Tornado Web服务器来排队需要在请求/响应周期之外处理的项目.

在下面的简化示例中,每次请求进入时,我都会在名为的列表中添加一个新字符串queued_items.我想创建一些能够观察该列表并处理其中显示的项目的内容.

(在我的实际代码中,项目通过TCP套接字进行处理和发送,当Web请求到达时,可能会或可能不会连接.我希望Web服务器不管套接字连接如何都会保持排队项目

我试图保持这个代码简单,不使用像Redis或Beanstalk这样的外部队列/程序.它不会有很高的音量.

使用Tornado成语来查看client.queued_items新项目列表并在它们到达时处理它们的好方法是什么?

import time

import tornado.ioloop
import tornado.gen
import tornado.web

class Client():

    def __init__(self):
        self.queued_items = []

    @tornado.gen.coroutine
    def watch_queue(self):
        # I have no idea what I'm doing
        items = yield client.queued_items
        # go_do_some_thing_with_items(items)

class IndexHandler(tornado.web.RequestHandler):

    def get(self):
        client.queued_items.append("%f" % time.time())
        self.write("Queued a new item")

if __name__ == "__main__":

    client = Client()

    # Watch the queue for when new items show up
    client.watch_queue()

    # Create the web server 
    application = tornado.web.Application([
        (r'/', IndexHandler),
    ], debug=True)

    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 11

有一个名为的库toro,它提供了同步原语tornado.[更新:截至龙卷风4.2,toro已合并为tornado.]

听起来像是你可以只使用一个toro.Queue(或tornado.queues.Queuetornado4.2+)来处理这个问题:

import time

import toro
import tornado.ioloop
import tornado.gen
import tornado.web

class Client():

    def __init__(self):
        self.queued_items = toro.Queue()

    @tornado.gen.coroutine
    def watch_queue(self):
        while True:
            items = yield self.queued_items.get()
            # go_do_something_with_items(items)

class IndexHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self):
        yield client.queued_items.put("%f" % time.time())
        self.write("Queued a new item")

if __name__ == "__main__":

    client = Client()

    # Watch the queue for when new items show up
    tornado.ioloop.IOLoop.instance().add_callback(client.watch_queue)

    # Create the web server 
    application = tornado.web.Application([
        (r'/', IndexHandler),
    ], debug=True)

    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)

除了将数据结构从列表切换到toro.Queue:之外,还需要进行一些调整:

  1. 我们需要安排watch_queue在IOLoop内部运行add_callback,而不是试图直接在IOLoop上下文之外调用它.
  2. IndexHandler.get需要转换为协程,因为toro.Queue.put是协程.

我还添加了一个while True循环watch_queue,以便它将永远运行,而不是只处理一个项目然后退出.