Rus*_*m K 12 python asynchronous tornado nonblocking celery
Celery包含一个能够使用amqp或其他一些芹菜后端发出异步HTTP请求的模块.我正在使用tornado-celery生产者进行异步消息发布.据我所知,龙卷风芹菜使用鼠兔为此.问题是如何使celery.task.http.URL适应龙卷风(使其无阻塞).基本上有两个地方需要改进:
HttpDispatch.make_request() 必须使用tornado async http客户端实现;URL.get_async(**kw)或者URL.post_async(**kw)必须使用tornado API重新实现相应的非阻塞代码.例如:
class NonBlockingURL(celery.task.http.URL):
@gen.coroutine
def post_async(self, **kwargs):
async_res = yield gen.Task(self.dispatcher.delay,
str(self), 'POST', **kwargs)
raise gen.Return(async_res)
Run Code Online (Sandbox Code Playgroud)但我无法理解如何以恰当和简洁的方式做到这一点.如何使其完全像非异步的非阻塞?顺便说一句,我正在使用amqp后端.
请给我一个很好的指导,甚至更好,一个例子.
事实上,你必须决定是使用 Tornado 的异步方法还是使用像 cellery 这样的队列。使用两者是没有意义的,因为队列会快速回答队列的状态,因此龙卷风在等待队列响应时没有做其他事情的意义。要在两种解决方案之间做出决定,我会说:
Celery:更加模块化,易于分发到不同的核心或不同的机器,任务可以被其他人使用,而不是tornado,你必须安装并保持运行软件(amqp,cellery工人...)
Tornado 中的异步:更加整体化,一个程序可以完成所有操作,代码更短,一个程序可以运行
要使用Tornado的异步方法,请参阅文档。这是结合使用芹菜和龙卷风的简短解决方案:
from celery import Celery,current_task
import time
celery=Celery('tasks',backend='amqp',result_backend='amqp')
@celery.task
def MyTask(url,resid):
for i in range(10):
time.sleep(1)
current_task.update_state(state='running',meta={'i': i})
return 'done'
Run Code Online (Sandbox Code Playgroud)
import tasks
from tornado import RequestHandler,....
from tornado.web import Application
dictasks={}
class runtask(RequestHandler):
def post(self):
i=len(dictasks)
dictasks[i]=task.MyTask.delay()
self.write(i)
class chktask(RequestHandler):
def get(self,i):
i=int(i)
if dictasks[i].ready():
self.write(dictasks[i].result)
del dictasks[i]
else:
self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1))
Application = Application([
(r"/runtask", runtask}),
(r"/chktask/([0-9]+)", chktask),
etc.
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
592 次 |
| 最近记录: |