龙卷风异步队列不等待

sku*_*erk 1 python queue tornado

我从这个 Tornado文档中使用生产者和消费者修改了示例队列,但传递给 get() 的超时参数似乎根本不起作用,因为消费者在抛出异常之前不会等待 10 秒。理想情况下,生产者和消费者将同时运行。另外,我不知道是将超时参数作为秒还是毫秒传递:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue()

@gen.coroutine
def consumer():
    try:
        while True:
            item = yield q.get(timeout=10000)
            try:
                print('Doing work on %s' % item)      
            finally:
                q.task_done()
    except gen.TimeoutError:
        print('timeout')
        return

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)
        yield gen.sleep(2)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Run Code Online (Sandbox Code Playgroud)

这是它的执行:

Put 0
Doing work on 0
timeout
Put 1
Put 2
Put 3
Put 4
Run Code Online (Sandbox Code Playgroud)

kwa*_*nek 5

超时

正如您在Tornado' Queue.get docs 中所读到的:

返回一个 Future ,它在项目可用时解决或在超时后引发 tornado.gen.TimeoutError 。

但这可能会产生误导,因为timeout实际上是一个deadline. 因此必须使用datetime.timedelta 对象指定它:

import datetime
yield q.get(timeout=datetime.timedelta(seconds=1))
Run Code Online (Sandbox Code Playgroud)

或绝对时间:

timeout = 1.5  # in seconds, floats acceptable
deadline = IOLoop.current().time() + timeout
# in most cases IOLoop time is just time.time()
# I've used separate variables only for the notion

yield q.get(timeout=deadline)
Run Code Online (Sandbox Code Playgroud)

在合并到 Tornado 的Toro 中,这个参数被称为deadline

在您的代码中,您指定了 timeout 10000,这意味着截止日期为Thu, 01 Jan 1970 02:46:40 GMT

消费者循环

由于您对整个功能(包括循环)进行了try/except阻止while,因此当TimeoutError您的消费者停止工作时。将异常处理移动到while循环中。

工作示例:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue()

@gen.coroutine
def consumer():
    i = 0
    while True:
        i += 1
        print('get cycle %s' % i)
        try:
            item = yield q.get(IOLoop.instance().time() + 3)
            try:
                print('Doing work on %s' % item)
            finally:
                q.task_done()
        except gen.TimeoutError:
            print('timeout')

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)
        yield gen.sleep(2)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')
Run Code Online (Sandbox Code Playgroud)