我正在尝试使用Celery作为Twisted应用程序的控制通道.My Twisted应用程序是一个抽象层,为各种本地运行的进程提供标准接口(通过ProcessProtocol).我想使用Celery来远程控制这个--AMQP似乎是从中央位置控制许多Twisted应用程序的理想方法,我想利用Celery的基于任务的功能,例如任务重试,子任务等.
这不符合我的计划,我希望有人可以帮我指出正确的方向来实现这个目标.
我在运行脚本时尝试实现的行为是:
'略微修改的芹菜'是芹菜的一个小修改,允许任务通过self.app.twisted访问Twisted反应器,并通过self.app.process生成进程.为了简单起见,我正在使用Celery的"独立"流程池实现,它不会为任务工作者分配新流程.
当我尝试使用Celery任务初始化ProcessProtocol(即启动外部进程)时,会出现问题.该过程正确启动,但ProcessProtocol的childDataReceived永远不会被调用.我认为这与文件描述符没有被正确继承/设置有关.
下面是一些示例代码,基于ProcessProtocol文档中的"wc"示例.它包括两个Celery任务 - 一个用于启动wc进程,另一个用于计算某些文本中的单词(使用之前启动的wc进程).
这个例子是相当有意思的,但是如果我可以使它工作,它将作为实现我的ProcessProtocols的一个良好的起点,这是长时间运行的进程,它将响应写入stdin的命令.
我通过首先运行Celery守护进程来测试它:
python2.6 mycelery.py -l info -P solo
然后,在另一个窗口中,运行一个发送两个任务的脚本:
python2.6 command_test.py
command_test.py的预期行为是执行两个命令 - 一个启动wc进程,另一个发送一些文本到CountWordsTask.实际发生的是:
任何人都可以对此有所了解,或就如何最好地使用Celery作为Twisted ProcessProtocols的控制通道提供一些建议?
为Celery编写一个Twisted支持的ProcessPool实现会更好吗?我通过reactor.callLater调用WorkerCommand.execute_from_commandline的方法是否正确,以确保在Twisted线程内发生的一切?
我已经读过AMPoule,我认为它可以提供一些这样的功能,但是如果可能的话我想坚持使用Celery,因为我在我的应用程序的其他部分使用它.
任何帮助或帮助将不胜感激!
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if …Run Code Online (Sandbox Code Playgroud)