小编Mik*_*yan的帖子

使用Celery作为Twisted应用程序的控制通道

我正在尝试使用Celery作为Twisted应用程序的控制通道.My Twisted应用程序是一个抽象层,为各种本地运行的进程提供标准接口(通过ProcessProtocol).我想使用Celery来远程控制这个--AMQP似乎是从中央位置控制许多Twisted应用程序的理想方法,我想利用Celery的基于任务的功能,例如任务重试,子任务等.

这不符合我的计划,我希望有人可以帮我指出正确的方向来实现这个目标.

我在运行脚本时尝试实现的行为是:

  • 开始略微修改的芹菜(见下文)
  • 等待Celery任务
  • 收到"启动进程"任务后,会生成ProcessProtocol
  • 收到其他任务后,在Twisted协议上运行一个函数,并使用Deferreds返回结果

'略微修改的芹菜'是芹菜的一个小修改,允许任务通过self.app.twisted访问Twisted反应器,并通过self.app.process生成进程.为了简单起见,我正在使用Celery的"独立"流程池实现,它不会为任务工作者分配新流程.

当我尝试使用Celery任务初始化ProcessProtocol(即启动外部进程)时,会出现问题.该过程正确启动,但ProcessProtocol的childDataReceived永远不会被调用.我认为这与文件描述符没有被正确继承/设置有关.

下面是一些示例代码,基于ProcessProtocol文档中的"wc"示例.它包括两个Celery任务 - 一个用于启动wc进程,另一个用于计算某些文本中的单词(使用之前启动的wc进程).

这个例子是相当有意思的,但是如果我可以使它工作,它将作为实现我的ProcessProtocols的一个良好的起点,这是长时间运行的进程,它将响应写入stdin的命令.

我通过首先运行Celery守护进程来测试它:

python2.6 mycelery.py -l info -P solo

然后,在另一个窗口中,运行一个发送两个任务的脚本:

python2.6 command_test.py

command_test.py的预期行为是执行两个命令 - 一个启动wc进程,另一个发送一些文本到CountWordsTask.实际发生的是:

  • StartProcTask生成进程,并通过Deffered接收'process started'作为响应
  • CountWordsTask永远不会收到结果,因为从不调用childDataReceived

任何人都可以对此有所了解,或就如何最好地使用Celery作为Twisted ProcessProtocols的控制通道提供一些建议?

为Celery编写一个Twisted支持的ProcessPool实现会更好吗?我通过reactor.callLater调用WorkerCommand.execute_from_commandline的方法是否正确,以确保在Twisted线程内发生的一切?

我已经读过AMPoule,我认为它可以提供一些这样的功能,但是如果可能的话我想坚持使用Celery,因为我在我的应用程序的其他部分使用它.

任何帮助或帮助将不胜感激!

myceleryd.py

from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
    def __init__(self, twisted, *args, **kwargs):
        self.twisted = twisted
        super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
    get_my_app = partial(MyCeleryApp, reactor)
    worker = WorkerCommand(get_app=get_my_app)
    reactor.callLater(1, worker.execute_from_commandline)
    reactor.run()

if …
Run Code Online (Sandbox Code Playgroud)

python twisted celery

11
推荐指数
1
解决办法
3005
查看次数

标签 统计

celery ×1

python ×1

twisted ×1