sku*_*erk 1 python queue tornado
我从这个 Tornado文档中使用生产者和消费者修改了示例队列,但传递给 get() 的超时参数似乎根本不起作用,因为消费者在抛出异常之前不会等待 10 秒。理想情况下,生产者和消费者将同时运行。另外,我不知道是将超时参数作为秒还是毫秒传递:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue()
@gen.coroutine
def consumer():
try:
while True:
item = yield q.get(timeout=10000)
try:
print('Doing work on %s' % item)
finally:
q.task_done()
except gen.TimeoutError:
print('timeout')
return
@gen.coroutine
def producer():
for item in range(5):
yield q.put(item)
print('Put %s' % item)
yield gen.sleep(2)
@gen.coroutine
def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)
Run Code Online (Sandbox Code Playgroud)
这是它的执行:
Put 0
Doing work on 0
timeout
Put 1
Put 2
Put 3
Put 4
Run Code Online (Sandbox Code Playgroud)
超时
正如您在Tornado' Queue.get docs 中所读到的:
返回一个 Future ,它在项目可用时解决或在超时后引发 tornado.gen.TimeoutError 。
但这可能会产生误导,因为timeout实际上是一个deadline. 因此必须使用datetime.timedelta 对象指定它:
import datetime
yield q.get(timeout=datetime.timedelta(seconds=1))
Run Code Online (Sandbox Code Playgroud)
或绝对时间:
timeout = 1.5 # in seconds, floats acceptable
deadline = IOLoop.current().time() + timeout
# in most cases IOLoop time is just time.time()
# I've used separate variables only for the notion
yield q.get(timeout=deadline)
Run Code Online (Sandbox Code Playgroud)
在合并到 Tornado 的Toro 中,这个参数被称为deadline。
在您的代码中,您指定了 timeout 10000,这意味着截止日期为Thu, 01 Jan 1970 02:46:40 GMT。
消费者循环
由于您对整个功能(包括循环)进行了try/except阻止while,因此当TimeoutError您的消费者停止工作时。将异常处理移动到while循环中。
工作示例:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue()
@gen.coroutine
def consumer():
i = 0
while True:
i += 1
print('get cycle %s' % i)
try:
item = yield q.get(IOLoop.instance().time() + 3)
try:
print('Doing work on %s' % item)
finally:
q.task_done()
except gen.TimeoutError:
print('timeout')
@gen.coroutine
def producer():
for item in range(5):
yield q.put(item)
print('Put %s' % item)
yield gen.sleep(2)
@gen.coroutine
def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks.
print('Done')
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1149 次 |
| 最近记录: |